Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/Config discovery #37

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
885 changes: 524 additions & 361 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = [
"crd-api",
"operator-api",
"operator-k8s",
"sidecar",
Expand Down
17 changes: 0 additions & 17 deletions assets/xline_conf.tera

This file was deleted.

27 changes: 27 additions & 0 deletions crd-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "crd-api"
version = "0.1.0"
edition = "2021"
authors = ["DatenLord <[email protected]>"]
description = "Custom Resource Definition"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/xline-kv/xline-operator/tree/main/crd-api"
categories = ["API"]
keywords = ["operator", "API"]

[dependencies]
anyhow = "1.0.75"
garde = { version = "0.16.1", default-features = false, features = ["derive", "pattern", "url"] }
k8s-openapi = { version = "0.20.0", features = ["v1_28", "schemars"] }
kube = { version = "0.86.0", features = ["runtime", "derive"] }
regex = { version = "1", default-features = false, features = ["unicode-perl"] } # garde did not enable regex unicode-perl feature by default, but we need it
schemars = "0.8.6"
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.0", features = ["time"] }
tracing = "0.1.37"

# Some false positive deps...
[package.metadata.cargo-machete]
ignored = ["regex", "serde_json"]
42 changes: 42 additions & 0 deletions crd-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/// v1alpha1
/// Features:
/// 1. Xline sidecar
/// 2. PV backup
pub mod v1alpha1;

/// CRD version
pub mod version;

/// Current CRD `XineCluster`
pub use v1alpha1::Cluster;

use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::runtime::conditions;
use kube::runtime::wait::await_condition;
use kube::{Api, Client};
use tracing::debug;

use std::time::Duration;

/// wait crd to establish timeout
const CRD_ESTABLISH_TIMEOUT: Duration = Duration::from_secs(20);

/// Setup CRD
pub async fn setup(
kube_client: &Client,
manage_crd: bool,
auto_migration: bool,
) -> anyhow::Result<()> {
v1alpha1::set_up(kube_client, manage_crd, auto_migration).await
}

/// Wait for CRD to be established
async fn wait_crd_established(
crd_api: Api<CustomResourceDefinition>,
crd_name: &str,
) -> anyhow::Result<()> {
let establish = await_condition(crd_api, crd_name, conditions::is_crd_established());
debug!("wait for crd established");
_ = tokio::time::timeout(CRD_ESTABLISH_TIMEOUT, establish).await??;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
#![allow(clippy::str_to_string)]
#![allow(clippy::missing_docs_in_private_items)]

#[cfg(test)]
use garde::Validate;
use k8s_openapi::api::core::v1::{Affinity, Container, PersistentVolumeClaim};
use k8s_openapi::serde::{Deserialize, Serialize};
use kube::CustomResource;
use schemars::JsonSchema;

/// Xline cluster specification
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Validate))]
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema, Validate)]
#[kube(
group = "xlineoperator.xline.cloud",
version = "v1alpha1",
Expand All @@ -28,69 +26,65 @@ use schemars::JsonSchema;
printcolumn = r#"{"name":"Backup Cron", "type":"string", "description":"The cron spec defining the interval a backup CronJob is run", "jsonPath":".spec.backup.cron"}"#,
printcolumn = r#"{"name":"Age", "type":"date", "description":"The cluster age", "jsonPath":".metadata.creationTimestamp"}"#
)]
pub(crate) struct ClusterSpec {
#[schemars(rename_all = "camelCase")]
#[garde(allow_unvalidated)]
pub struct ClusterSpec {
/// Size of the xline cluster, less than 3 is not allowed
#[cfg_attr(test, garde(range(min = 3)))]
#[garde(range(min = 3))]
#[schemars(range(min = 3))]
pub(crate) size: i32,
pub size: usize,
/// Xline container specification
#[cfg_attr(test, garde(skip))]
pub(crate) container: Container,
pub container: Container,
/// The affinity of the xline node
#[cfg_attr(test, garde(skip))]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) affinity: Option<Affinity>,
pub affinity: Option<Affinity>,
/// Backup specification
#[cfg_attr(test, garde(custom(option_backup_dive)))]
#[garde(dive)]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) backup: Option<BackupSpec>,
pub backup: Option<BackupSpec>,
/// The data PVC, if it is not specified, then use emptyDir instead
#[cfg_attr(test, garde(skip))]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) data: Option<PersistentVolumeClaim>,
pub data: Option<PersistentVolumeClaim>,
/// Some user defined persistent volume claim templates
#[cfg_attr(test, garde(skip))]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) pvcs: Option<Vec<PersistentVolumeClaim>>,
pub pvcs: Option<Vec<PersistentVolumeClaim>>,
}

/// Xline cluster backup specification
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Validate))]
pub(crate) struct BackupSpec {
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)]
pub struct BackupSpec {
/// Cron Spec
#[cfg_attr(test, garde(pattern(r"^(?:\*|[0-5]?\d)(?:[-/,]?(?:\*|[0-5]?\d))*(?: +(?:\*|1?[0-9]|2[0-3])(?:[-/,]?(?:\*|1?[0-9]|2[0-3]))*){4}$")))]
#[garde(pattern(r"^(?:\*|[0-5]?\d)(?:[-/,]?(?:\*|[0-5]?\d))*(?: +(?:\*|1?[0-9]|2[0-3])(?:[-/,]?(?:\*|1?[0-9]|2[0-3]))*){4}$"))]
#[schemars(regex(
pattern = r"^(?:\*|[0-5]?\d)(?:[-/,]?(?:\*|[0-5]?\d))*(?: +(?:\*|1?[0-9]|2[0-3])(?:[-/,]?(?:\*|1?[0-9]|2[0-3]))*){4}$"
))]
pub(crate) cron: String,
pub cron: String,
/// Backup storage type
#[cfg_attr(test, garde(dive))]
#[garde(dive)]
#[serde(flatten)]
pub(crate) storage: StorageSpec,
pub storage: StorageSpec,
}

/// Xline cluster backup storage specification
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Validate))]
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)]
#[serde(untagged)]
pub(crate) enum StorageSpec {
pub enum StorageSpec {
/// S3 backup type
S3 {
/// S3 backup specification
#[cfg_attr(test, garde(dive))]
#[garde(dive)]
s3: S3Spec,
},
/// Persistent volume backup type
Pvc {
/// Persistent volume claim
#[cfg_attr(test, garde(skip))]
#[garde(skip)]
pvc: PersistentVolumeClaim,
},
}

impl StorageSpec {
pub(crate) fn as_pvc(&self) -> Option<&PersistentVolumeClaim> {
pub fn as_pvc(&self) -> Option<&PersistentVolumeClaim> {
match *self {
Self::Pvc { ref pvc } => Some(pvc),
Self::S3 { .. } => None,
Expand All @@ -99,30 +93,21 @@ impl StorageSpec {
}

/// Xline cluster backup S3 specification
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Validate))]
pub(crate) struct S3Spec {
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)]
pub struct S3Spec {
/// S3 bucket name to use for backup
#[cfg_attr(test, garde(pattern(r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$")))]
#[garde(pattern(r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$"))]
#[schemars(regex(pattern = r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$"))]
pub(crate) bucket: String,
pub bucket: String,
}

/// Xline cluster status
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
pub(crate) struct ClusterStatus {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default, Validate)]
#[garde(context(ClusterSpec as ctx))]
pub struct ClusterStatus {
/// The available nodes' number in the cluster
pub(crate) available: i32,
}

#[cfg(test)]
#[allow(clippy::trivially_copy_pass_by_ref)] // required bt garde
fn option_backup_dive(value: &Option<BackupSpec>, _cx: &()) -> garde::Result {
if let Some(spec) = value.as_ref() {
spec.validate(&())
.map_err(|e| garde::Error::new(e.to_string()))?;
}
Ok(())
#[garde(range(max = ctx.size))]
pub available: usize,
}

#[cfg(test)]
Expand Down Expand Up @@ -165,10 +150,10 @@ mod test {
pvcs: None,
data: None,
};
assert_eq!(
Validate::validate(&bad_size, &()).unwrap_err().flatten()[0].0,
"value.size"
);
assert!(Validate::validate(&bad_size, &())
.unwrap_err()
.to_string()
.contains("size"));
}

#[test]
Expand All @@ -186,10 +171,10 @@ mod test {
pvcs: None,
data: None,
};
assert_eq!(
Validate::validate(&bad_cron, &()).unwrap_err().flatten()[0].0,
"value.backup"
);
assert!(Validate::validate(&bad_cron, &())
.unwrap_err()
.to_string()
.contains("backup.cron"));
}

#[test]
Expand All @@ -209,9 +194,9 @@ mod test {
pvcs: None,
data: None,
};
assert_eq!(
Validate::validate(&bad_bucket, &()).unwrap_err().flatten()[0].0,
"value.backup"
);
assert!(Validate::validate(&bad_bucket, &())
.unwrap_err()
.to_string()
.contains("backup.storage.s3.bucket"))
}
}
Loading
Loading