diff --git a/Cargo.lock b/Cargo.lock index 7dd855c2..79ec6c14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -371,6 +371,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "castaway" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a17ed5635fc8536268e5d4de1e22e81ac34419e5f052d4d51f4e01dcc263fcc" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.0.83" @@ -520,6 +529,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "compact_str" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", +] + [[package]] name = "concurrent-queue" version = "2.3.0" @@ -562,10 +584,10 @@ dependencies = [ "garde", "k8s-openapi", "kube", + "regex", "schemars", "serde", "serde_json", - "serde_yaml", "tokio", "tracing", ] @@ -1095,20 +1117,23 @@ dependencies = [ [[package]] name = "garde" -version = "0.11.2" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d959ef7bda0bda7cc0f6fbebfbac6202f810394f50e07059eeea8ec31e69e4b0" +checksum = "d16596022bab79d38f74999f49b2fa08db8e4e568b43a95a2bd78afbee13f55c" dependencies = [ + "compact_str", "garde_derive", "once_cell", "regex", + "smallvec", + "url", ] [[package]] name = "garde_derive" -version = "0.11.2" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e89f7fce035bb3a3718e23efff13709a0b21b694c4eae20a32e1a3e4e27c6a2" +checksum = "38ae3270e3d5914fcf8ac7b3f77d2e7d424018913e62f5f3ade51995da109412" dependencies = [ "proc-macro2", "quote", @@ -3089,6 +3114,12 @@ dependencies = [ "lock_api", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.10.0" @@ -3405,19 +3436,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.19.15", -] - -[[package]] -name = "toml" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "185d8ab0dfbb35cf1399a6344d8484209c088f75f8f68230da55d48d95d43e3d" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.20.2", + "toml_edit", ] [[package]] @@ -3442,19 +3461,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "toml_edit" -version = "0.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" -dependencies = [ - "indexmap 2.0.2", - "serde", - "serde_spanned", - "toml_datetime", - "winnow", -] - [[package]] name = "tonic" version = "0.9.2" @@ -4108,10 +4114,9 @@ dependencies = [ "kube", "operator-api", "prometheus", - "schemars", "serde", "serde_json", - "thiserror", + "serde_yaml", "tokio", "tracing", "tracing-subscriber", @@ -4131,9 +4136,7 @@ dependencies = [ "operator-api", "reqwest", "serde", - "serde_json", "tokio", - "toml 0.8.2", "tonic", "tonic-health", "tracing", diff --git a/crd-api/Cargo.toml b/crd-api/Cargo.toml index 9ccafbc0..10cf1c64 100644 --- a/crd-api/Cargo.toml +++ b/crd-api/Cargo.toml @@ -12,14 +12,16 @@ keywords = ["operator", "API"] [dependencies] anyhow = "1.0.75" +garde = { version = "0.16.1", default-features = false, features = ["derive", "pattern", "url"] } k8s-openapi = { version = "0.20.0", features = ["v1_28", "schemars"] } kube = { version = "0.86.0", features = ["runtime", "derive"] } +regex = { version = "1", default-features = false, features = ["unicode-perl"] } # garde did not enable regex unicode-perl feature by default, but we need it schemars = "0.8.6" serde = { version = "1.0.130", features = ["derive"] } -serde_json = "1.0.97" +serde_json = "1" tokio = { version = "1.0", features = ["time"] } tracing = "0.1.37" -[dev-dependencies] -garde = { version = "0.11.2", default-features = false, features = ["derive", "pattern"] } -serde_yaml = "0.9.25" +# Some false positive deps... +[package.metadata.cargo-machete] +ignored = ["regex", "serde_json"] diff --git a/crd-api/src/v1alpha1/cluster.rs b/crd-api/src/v1alpha1/cluster.rs index f864f6df..db80454b 100644 --- a/crd-api/src/v1alpha1/cluster.rs +++ b/crd-api/src/v1alpha1/cluster.rs @@ -2,17 +2,16 @@ #![allow(clippy::str_to_string)] #![allow(clippy::missing_docs_in_private_items)] -#[cfg(test)] use garde::Validate; use k8s_openapi::api::core::v1::{Affinity, Container, PersistentVolumeClaim}; use k8s_openapi::serde::{Deserialize, Serialize}; use kube::CustomResource; use schemars::JsonSchema; use std::collections::HashMap; +use std::net::IpAddr; /// Xline cluster specification -#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] -#[cfg_attr(test, derive(Validate))] +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema, Validate)] #[kube( group = "xlineoperator.xline.cloud", version = "v1alpha1", @@ -29,63 +28,59 @@ use std::collections::HashMap; printcolumn = r#"{"name":"Backup Cron", "type":"string", "description":"The cron spec defining the interval a backup CronJob is run", "jsonPath":".spec.backup.cron"}"#, printcolumn = r#"{"name":"Age", "type":"date", "description":"The cluster age", "jsonPath":".metadata.creationTimestamp"}"# )] +#[schemars(rename_all = "camelCase")] +#[garde(allow_unvalidated)] pub struct ClusterSpec { /// Size of the xline cluster, less than 3 is not allowed - #[cfg_attr(test, garde(range(min = 3)))] + #[garde(range(min = 3))] #[schemars(range(min = 3))] - pub size: i32, + pub size: usize, /// Xline container specification - #[cfg_attr(test, garde(skip))] pub container: Container, /// The affinity of the xline node - #[cfg_attr(test, garde(skip))] #[serde(skip_serializing_if = "Option::is_none")] pub affinity: Option, /// Backup specification - #[cfg_attr(test, garde(custom(option_backup_dive)))] + #[garde(dive)] #[serde(skip_serializing_if = "Option::is_none")] pub backup: Option, /// The data PVC, if it is not specified, then use emptyDir instead - #[cfg_attr(test, garde(skip))] #[serde(skip_serializing_if = "Option::is_none")] pub data: Option, /// Some user defined persistent volume claim templates - #[cfg_attr(test, garde(skip))] #[serde(skip_serializing_if = "Option::is_none")] pub pvcs: Option>, } /// Xline cluster backup specification -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] -#[cfg_attr(test, derive(Validate))] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)] pub struct BackupSpec { /// Cron Spec - #[cfg_attr(test, garde(pattern(r"^(?:\*|[0-5]?\d)(?:[-/,]?(?:\*|[0-5]?\d))*(?: +(?:\*|1?[0-9]|2[0-3])(?:[-/,]?(?:\*|1?[0-9]|2[0-3]))*){4}$")))] + #[garde(pattern(r"^(?:\*|[0-5]?\d)(?:[-/,]?(?:\*|[0-5]?\d))*(?: +(?:\*|1?[0-9]|2[0-3])(?:[-/,]?(?:\*|1?[0-9]|2[0-3]))*){4}$"))] #[schemars(regex( pattern = r"^(?:\*|[0-5]?\d)(?:[-/,]?(?:\*|[0-5]?\d))*(?: +(?:\*|1?[0-9]|2[0-3])(?:[-/,]?(?:\*|1?[0-9]|2[0-3]))*){4}$" ))] pub cron: String, /// Backup storage type - #[cfg_attr(test, garde(dive))] + #[garde(dive)] #[serde(flatten)] pub storage: StorageSpec, } /// Xline cluster backup storage specification -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] -#[cfg_attr(test, derive(Validate))] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)] #[serde(untagged)] pub enum StorageSpec { /// S3 backup type S3 { /// S3 backup specification - #[cfg_attr(test, garde(dive))] + #[garde(dive)] s3: S3Spec, }, /// Persistent volume backup type Pvc { /// Persistent volume claim - #[cfg_attr(test, garde(skip))] + #[garde(skip)] pvc: PersistentVolumeClaim, }, } @@ -100,32 +95,24 @@ impl StorageSpec { } /// Xline cluster backup S3 specification -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] -#[cfg_attr(test, derive(Validate))] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)] pub struct S3Spec { /// S3 bucket name to use for backup - #[cfg_attr(test, garde(pattern(r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$")))] + #[garde(pattern(r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$"))] #[schemars(regex(pattern = r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$"))] pub bucket: String, } /// Xline cluster status -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default, Validate)] +#[garde(context(ClusterSpec as ctx))] pub struct ClusterStatus { /// The available nodes' number in the cluster - pub available: i32, + #[garde(range(max = ctx.size))] + pub available: usize, /// The members registry - pub members: HashMap, -} - -#[cfg(test)] -#[allow(clippy::trivially_copy_pass_by_ref)] // required bt garde -fn option_backup_dive(value: &Option, _cx: &()) -> garde::Result { - if let Some(spec) = value.as_ref() { - spec.validate(&()) - .map_err(|e| garde::Error::new(e.to_string()))?; - } - Ok(()) + #[garde(skip)] + pub members: HashMap, } #[cfg(test)] @@ -168,10 +155,10 @@ mod test { pvcs: None, data: None, }; - assert_eq!( - Validate::validate(&bad_size, &()).unwrap_err().flatten()[0].0, - "value.size" - ); + assert!(Validate::validate(&bad_size, &()) + .unwrap_err() + .to_string() + .contains("size")); } #[test] @@ -189,10 +176,10 @@ mod test { pvcs: None, data: None, }; - assert_eq!( - Validate::validate(&bad_cron, &()).unwrap_err().flatten()[0].0, - "value.backup" - ); + assert!(Validate::validate(&bad_cron, &()) + .unwrap_err() + .to_string() + .contains("backup.cron")); } #[test] @@ -212,9 +199,9 @@ mod test { pvcs: None, data: None, }; - assert_eq!( - Validate::validate(&bad_bucket, &()).unwrap_err().flatten()[0].0, - "value.backup" - ); + assert!(Validate::validate(&bad_bucket, &()) + .unwrap_err() + .to_string() + .contains("backup.storage.s3.bucket")) } } diff --git a/operator-api/src/registry.rs b/operator-api/src/registry.rs index 0a90a22b..1b9a6db3 100644 --- a/operator-api/src/registry.rs +++ b/operator-api/src/registry.rs @@ -7,7 +7,9 @@ use k8s_openapi::serde_json::json; use kube::api::{Patch, PatchParams}; use kube::core::object::HasStatus; use kube::{Api, Client}; + use std::collections::HashMap; +use std::net::ToSocketAddrs; pub struct Config { /// Members [node_name] => [node_ip] @@ -23,12 +25,13 @@ pub trait Registry { async fn send_fetch(&self, self_name: String, self_ip: String) -> anyhow::Result; } -pub struct CustomResourceRegistry { +/// K8s custom resource `Cluster` status registry +pub struct K8sClusterStatusRegistry { cluster_name: String, cluster_api: Api, } -impl CustomResourceRegistry { +impl K8sClusterStatusRegistry { /// New a registry with default kube client pub async fn new_with_default(cluster_name: String, namespace: &str) -> Self { let kube_client = Client::try_default() @@ -49,15 +52,23 @@ impl CustomResourceRegistry { } #[async_trait] -impl Registry for CustomResourceRegistry { +impl Registry for K8sClusterStatusRegistry { async fn send_fetch(&self, self_name: String, self_ip: String) -> anyhow::Result { /// TODO: hold a distributed lock here let cluster = self.cluster_api.get_status(&self.cluster_name).await?; let mut status = cluster .status .ok_or_else(|| anyhow!("no status found in cluster {}", self.cluster_name))?; - if status.members.get(&self_name) != Some(&self_ip) { - status.members.insert(self_name, self_ip); + + // dns may not change, but ip can + let ip = format!("{self_ip}:0") + .to_socket_addrs()? + .next() + .ok_or_else(|| anyhow!("cannot resolve dns {self_ip}"))? + .ip(); + + if status.members.get(&self_name) != Some(&ip) { + status.members.insert(self_name, ip); let patch = json!({ "status": status, }); @@ -70,9 +81,14 @@ impl Registry for CustomResourceRegistry { ) .await?; } + Ok(Config { - members: status.members, - cluster_size: cluster.spec.size as usize, + members: status + .members + .into_iter() + .map(|(k, v)| (k, v.to_string())) + .collect(), + cluster_size: cluster.spec.size, }) } } diff --git a/operator-k8s/Cargo.toml b/operator-k8s/Cargo.toml index be24aa06..2984035b 100644 --- a/operator-k8s/Cargo.toml +++ b/operator-k8s/Cargo.toml @@ -24,10 +24,8 @@ k8s-openapi = { version = "0.20.0", features = ["v1_28", "schemars"] } kube = { version = "0.86.0", features = ["runtime", "derive"] } operator-api = { path = "../operator-api" } prometheus = "0.13.3" -schemars = "0.8.6" serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.97" -thiserror = "1.0.40" tokio = { version = "1.0", features = [ "rt-multi-thread", "time", @@ -37,3 +35,5 @@ tokio = { version = "1.0", features = [ tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } +[dev-dependencies] +serde_yaml = "0.9.25" diff --git a/operator-k8s/src/manager/cluster.rs b/operator-k8s/src/manager/cluster.rs index b3bfce45..ea92d634 100644 --- a/operator-k8s/src/manager/cluster.rs +++ b/operator-k8s/src/manager/cluster.rs @@ -374,7 +374,10 @@ impl Factory { StatefulSet { metadata: self.general_metadata(Component::Nodes), spec: Some(StatefulSetSpec { - replicas: Some(size), + replicas: Some( + i32::try_from(size) + .unwrap_or_else(|_| unreachable!("size should not overflow i32::MAX")), + ), selector: LabelSelector { match_expressions: None, match_labels: Some(labels), diff --git a/operator-k8s/src/monitor.rs b/operator-k8s/src/monitor.rs index 0c05013f..0fd0a3b6 100644 --- a/operator-k8s/src/monitor.rs +++ b/operator-k8s/src/monitor.rs @@ -166,11 +166,7 @@ impl SidecarMonitor { /// Get the certain cluster size in the specification async fn get_spec_size(&self, cluster_name: &str) -> Result { let cluster = self.ctx.cluster_api.get(cluster_name).await?; - Ok(cluster - .spec - .size - .try_into() - .unwrap_or_else(|_| unreachable!("the spec size should not be negative"))) + Ok(cluster.spec.size) } } diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index 08a99161..94bba3d3 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -22,14 +22,12 @@ futures = "0.3.28" operator-api = { path = "../operator-api" } reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0.130", features = ["derive"] } -serde_json = "1.0.97" tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "macros", "net", ] } -toml = "0.8.2" tonic = "0.9.2" tonic-health = "0.9.2" tracing = "0.1.37" diff --git a/sidecar/src/main.rs b/sidecar/src/main.rs index 86175ed3..df0dcc79 100644 --- a/sidecar/src/main.rs +++ b/sidecar/src/main.rs @@ -146,7 +146,7 @@ use operator_api::consts::DEFAULT_DATA_DIR; use operator_api::XlineConfig; use tracing::debug; use xline_sidecar::sidecar::Sidecar; -use xline_sidecar::types::{BackendConfig, BackupConfig, Config, MonitorConfig}; +use xline_sidecar::types::{BackendConfig, BackupConfig, Config, MemberConfig, MonitorConfig}; /// `DEFAULT_DATA_DIR` to String fn default_data_dir() -> String { @@ -216,9 +216,11 @@ impl From for Config { Self { name: value.name.clone(), cluster_name: value.cluster_name, - init_members: value.init_members, - xline_port: value.xline_port, - sidecar_port: value.sidecar_port, + init_member: MemberConfig { + members: value.init_members, + xline_port: value.xline_port, + sidecar_port: value.sidecar_port, + }, reconcile_interval: Duration::from_secs(value.reconcile_interval), backend: value.backend, xline: XlineConfig { diff --git a/sidecar/src/sidecar.rs b/sidecar/src/sidecar.rs index 0ed51955..14694243 100644 --- a/sidecar/src/sidecar.rs +++ b/sidecar/src/sidecar.rs @@ -117,7 +117,7 @@ impl Sidecar { &self.config.xline.data_dir, backup, inner, - self.config.xline_port, + self.config.init_member.xline_port, ) } @@ -138,7 +138,7 @@ impl Sidecar { }; let cluster_name = self.config.cluster_name.clone(); let name = self.config.name.clone(); - let init_members = self.config.init_sidecar_members(); + let init_members = self.config.init_member.sidecar_members(); #[allow(clippy::integer_arithmetic)] // this error originates in the macro `tokio::select` let _ig = tokio::spawn(async move { @@ -183,7 +183,7 @@ impl Sidecar { handle, self.config.reconcile_interval, ); - let init_member_config = self.config.init_member_config(); + let init_member_config = self.config.init_member.clone(); let _ig = tokio::spawn(async move { let mut shutdown = graceful_shutdown; let res = controller @@ -204,7 +204,7 @@ impl Sidecar { state: Arc>, graceful_shutdown: Receiver<()>, ) -> Result<()> { - let members = self.config.init_sidecar_members(); + let members = self.config.init_member.sidecar_members(); let advertise_url = members.get(&self.config.name).ok_or(anyhow!( "node name {} not found in members", self.config.name diff --git a/sidecar/src/types.rs b/sidecar/src/types.rs index 61c33ef7..2a5bcc5c 100644 --- a/sidecar/src/types.rs +++ b/sidecar/src/types.rs @@ -16,12 +16,8 @@ pub struct Config { pub name: String, /// The cluster name pub cluster_name: String, - /// Nodes initial hosts, [pod_name]->[pod_host] - pub init_members: HashMap, - /// The xline server port - pub xline_port: u16, - /// The sidecar web server port - pub sidecar_port: u16, + /// Initial member config + pub init_member: MemberConfig, /// Reconcile cluster interval pub reconcile_interval: Duration, /// The xline config @@ -47,13 +43,14 @@ pub struct MonitorConfig { /// Member config #[derive(Debug, Clone)] -pub(crate) struct MemberConfig { +#[allow(clippy::exhaustive_structs)] // It is only constructed once +pub struct MemberConfig { /// Nodes hosts, [pod_name]->[pod_host] - pub(crate) members: HashMap, + pub members: HashMap, /// The xline server port - pub(crate) xline_port: u16, + pub xline_port: u16, /// The sidecar web server port - pub(crate) sidecar_port: u16, + pub sidecar_port: u16, } /// Sidecar backend, it determinate how xline could be setup @@ -89,35 +86,6 @@ pub enum BackupConfig { }, } -impl Config { - /// Get the initial sidecar members - pub(crate) fn init_sidecar_members(&self) -> HashMap { - self.init_members - .clone() - .into_iter() - .map(|(name, host)| (name, format!("{host}:{}", self.sidecar_port))) - .collect() - } - - /// Get the initial xline members - pub(crate) fn init_xline_members(&self) -> HashMap { - self.init_members - .clone() - .into_iter() - .map(|(name, host)| (name, format!("{host}:{}", self.xline_port))) - .collect() - } - - /// Get the initial member config - pub(crate) fn init_member_config(&self) -> MemberConfig { - MemberConfig { - members: self.init_members.clone(), - xline_port: self.xline_port, - sidecar_port: self.sidecar_port, - } - } -} - impl MemberConfig { /// Get the xline members pub(crate) fn xline_members(&self) -> HashMap { diff --git a/sidecar/src/xline.rs b/sidecar/src/xline.rs index b8f403d3..1d97e415 100644 --- a/sidecar/src/xline.rs +++ b/sidecar/src/xline.rs @@ -112,6 +112,14 @@ impl XlineHandle { Ok(()) } + /// Update member update the client + /// TODO: should xline client automatically discovery xlines? + pub(crate) async fn update_member(&mut self, xlines: &HashMap) -> Result<()> { + let new_client = Client::connect(xlines.values(), ClientOptions::default()).await?; + _ = self.client.replace(new_client); + Ok(()) + } + /// Start the xline server pub(crate) async fn start(&mut self, xlines: &HashMap) -> Result<()> { /// Timeout for test start