Skip to content

Commit

Permalink
feat: implement registry
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 30, 2023
1 parent d7d1fbc commit 65e1528
Show file tree
Hide file tree
Showing 16 changed files with 585 additions and 100 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions crd-api/src/v1alpha1/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use k8s_openapi::api::core::v1::{Affinity, Container, PersistentVolumeClaim};
use k8s_openapi::serde::{Deserialize, Serialize};
use kube::CustomResource;
use schemars::JsonSchema;
use std::collections::HashMap;
use std::net::IpAddr;

/// Xline cluster specification
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema, Validate)]
Expand Down Expand Up @@ -110,9 +108,6 @@ pub struct ClusterStatus {
/// The available nodes' number in the cluster
#[garde(range(max = ctx.size))]
pub available: usize,
/// The members registry
#[garde(skip)]
pub members: HashMap<String, IpAddr>,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions operator-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ k8s-openapi = { version = "0.20.0", features = ["v1_28", "schemars"] }
kube = { version = "0.86.0", features = ["runtime", "derive", "ws"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0.130", features = ["derive"] }
tokio = { version = "1.0", features = ["time"] }
2 changes: 2 additions & 0 deletions operator-api/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pub const DEFAULT_BACKUP_DIR: &str = "/xline-backup";
pub const DEFAULT_DATA_DIR: &str = "/usr/local/xline/data-dir";
/// the URL ROUTE that sidecar sends heartbeat status to
pub const OPERATOR_MONITOR_ROUTE: &str = "/monitor";
/// the URL ROUTE that sidecar sends fetch config to
pub const OPERATOR_REGISTRY_ROUTE: &str = "/registry";
/// the URL ROUTE of each sidecar for backup
pub const SIDECAR_BACKUP_ROUTE: &str = "/backup";
/// the URL ROUTE of each sidecar member for health checking
Expand Down
237 changes: 181 additions & 56 deletions operator-api/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,92 +3,217 @@
use anyhow::anyhow;
use async_trait::async_trait;
use crd_api::Cluster;
use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::serde_json::json;
use kube::api::{Patch, PatchParams};
use kube::core::object::HasStatus;
use kube::{Api, Client};

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::sync::OnceLock;
use std::time::Duration;

use crate::consts::OPERATOR_REGISTRY_ROUTE;

pub use dummy::*;
pub use http::*;
pub use k8s::*;

#[derive(Deserialize, Debug, Default, Clone, Serialize)]
pub struct Config {
/// Members [node_name] => [node_ip]
/// Members [node_name] => [node_host]
pub members: HashMap<String, String>,
/// Cluster size
pub cluster_size: usize,
}

const WAIT_DELAY: Duration = Duration::from_secs(5);
const WAIT_THRESHOLD: usize = 60;

#[async_trait]
pub trait Registry {
const FIELD_MANAGER: &'static str = "xlineoperator.datenlord.io/registry";
async fn send_fetch(&self, self_name: String, self_host: String) -> anyhow::Result<Config>;

async fn send_fetch(&self, self_name: String, self_ip: String) -> anyhow::Result<Config>;
async fn wait_full_fetch(
&self,
self_name: String,
self_host: String,
) -> anyhow::Result<Config> {
let mut retry = 0;
loop {
let config = self
.send_fetch(self_name.clone(), self_host.clone())
.await?;
if config.members.len() == config.cluster_size {
break Ok(config);
}
retry += 1;
if retry > WAIT_THRESHOLD {
break Err(anyhow!("wait for full config timeout"));
}
tokio::time::sleep(WAIT_DELAY).await;
}
}
}

/// K8s custom resource `Cluster` status registry
pub struct K8sClusterStatusRegistry {
cluster_name: String,
cluster_api: Api<Cluster>,
}
mod k8s {
use super::*;

impl K8sClusterStatusRegistry {
/// New a registry with default kube client
pub async fn new_with_default(cluster_name: String, namespace: &str) -> Self {
let kube_client = Client::try_default()
.await
.unwrap_or_else(|_ig| unreachable!("it must be setup in k8s environment"));
Self {
cluster_name,
cluster_api: Api::namespaced(kube_client, namespace),
const FIELD_MANAGER: &str = "xlineoperator.datenlord.io/registry";

/// K8s statefulset registry
pub struct K8sStsRegistry {
sts_name: String,
sts_api: Api<StatefulSet>,
namespace: String,
dns_suffix: String,
}

impl K8sStsRegistry {
/// New a k8s statefulset registry with default kube client
pub async fn new_with_default(
sts_name: String,
namespace: String,
dns_suffix: String,
) -> Self {
let kube_client = Client::try_default()
.await
.unwrap_or_else(|_ig| unreachable!("it must be setup in k8s environment"));
Self {
sts_name,
sts_api: Api::namespaced(kube_client, &namespace),
namespace,
dns_suffix,
}
}

pub fn new(
sts_name: String,
namespace: String,
kube_client: Client,
dns_suffix: String,
) -> Self {
Self {
sts_name,
sts_api: Api::namespaced(kube_client, &namespace),
namespace,
dns_suffix,
}
}
}

pub async fn new(cluster_name: String, namespace: &str, kube_client: Client) -> Self {
Self {
cluster_name,
cluster_api: Api::namespaced(kube_client, namespace),
#[async_trait]
impl Registry for K8sStsRegistry {
async fn send_fetch(&self, _: String, _: String) -> anyhow::Result<Config> {
let sts = self.sts_api.get(&self.sts_name).await?;
let spec = sts
.spec
.unwrap_or_else(|| unreachable!(".spec should be set in statefulset"));
let replicas = spec
.replicas
.unwrap_or_else(|| unreachable!(".spec.replicas should be set in statefulset"));
let start_at = spec
.ordinals
.into_iter()
.flat_map(|ordinal| ordinal.start)
.next()
.unwrap_or(0);
let members = (start_at..)
.take(replicas as usize)
.map(|idx| {
let name = format!("{}-{idx}", self.sts_name);
let host = format!(
"{name}.{}.{}.svc.{}",
spec.service_name, self.namespace, self.dns_suffix
);
(name, host)
})
.collect();
Ok(Config {
members,
cluster_size: replicas as usize,
})
}
}
}

#[async_trait]
impl Registry for K8sClusterStatusRegistry {
async fn send_fetch(&self, self_name: String, self_ip: String) -> anyhow::Result<Config> {
/// TODO: hold a distributed lock here
let cluster = self.cluster_api.get_status(&self.cluster_name).await?;
let mut status = cluster
.status
.ok_or_else(|| anyhow!("no status found in cluster {}", self.cluster_name))?;

// dns may not change, but ip can
let ip = format!("{self_ip}:0")
.to_socket_addrs()?
.next()
.ok_or_else(|| anyhow!("cannot resolve dns {self_ip}"))?
.ip();

if status.members.get(&self_name) != Some(&ip) {
status.members.insert(self_name, ip);
let patch = json!({
"status": status,
mod http {
use super::*;

static DEFAULT_HTTP_CLIENT: OnceLock<reqwest::Client> = OnceLock::new();

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct RegisterQuery {
pub cluster: String,
pub name: String,
pub host: String,
}

/// HTTP server registry
pub struct HttpRegistry {
server_addr: String,
cluster_name: String,
}

impl HttpRegistry {
pub fn new(server_addr: String, cluster_name: String) -> Self {
Self {
server_addr,
cluster_name,
}
}
}

#[async_trait]
impl Registry for HttpRegistry {
async fn send_fetch(&self, self_name: String, self_host: String) -> anyhow::Result<Config> {
let client = DEFAULT_HTTP_CLIENT.get_or_init(|| {
reqwest::Client::builder().build().unwrap_or_else(|err| {
unreachable!("cannot build http client to register config, err: {err}")
})
});
let _ig = self
.cluster_api
.patch_status(
&self.cluster_name,
&PatchParams::apply(Self::FIELD_MANAGER),
&Patch::Apply(patch),
)
let config: Config = client
.get(format!(
"http://{}{}",
self.server_addr, OPERATOR_REGISTRY_ROUTE
))
.query(&RegisterQuery {
cluster: self.cluster_name.clone(),
name: self_name,
host: self_host,
})
.send()
.await?
.json()
.await?;
Ok(config)
}
}
}

mod dummy {
use super::*;

/// Dummy registry does not register anything, it keeps the original config
pub struct DummyRegistry {
init_members: HashMap<String, String>,
}

impl DummyRegistry {
pub fn new(init_members: HashMap<String, String>) -> Self {
Self { init_members }
}
}

Ok(Config {
members: status
.members
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
cluster_size: cluster.spec.size,
})
#[async_trait]
impl Registry for DummyRegistry {
async fn send_fetch(&self, _: String, _: String) -> anyhow::Result<Config> {
Ok(Config {
members: self.init_members.clone(),
cluster_size: self.init_members.len(),
})
}
}
}
5 changes: 4 additions & 1 deletion operator-k8s/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ pub struct Config {
#[arg(long, default_value = "cluster.local")]
pub cluster_suffix: String,
/// Maximum interval between accepted `HeartbeatStatus`
#[arg(long, default_value = "2")]
#[arg(long, default_value = "5")]
pub heartbeat_period: u64,
/// Sidecar unreachable counter threshold
#[arg(long, default_value = "4")]
pub unreachable_thresh: usize,
/// Sidecar config time to live, in seconds. No longer than 300.
#[arg(long, default_value = "30")]
pub registry_ttl: u64,
}

/// The namespace to work, `ClusterWide` means work with all namespaces
Expand Down
7 changes: 7 additions & 0 deletions operator-k8s/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@
)
)]

use std::collections::HashMap;

/// A sidecar cluster states map
type SidecarClusterOwned<T> = HashMap<String, T>;

/// Xline operator config
pub mod config;
/// Some constants
Expand All @@ -161,6 +166,8 @@ mod manager;
mod monitor;
/// Xline operator
pub mod operator;
/// HTTP server registry for sidecars to find each other
mod registry;
/// Xline operator web server router
mod router;

Expand Down
8 changes: 3 additions & 5 deletions operator-k8s/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use operator_api::HeartbeatStatus;
use tracing::{debug, error};

use crate::crd::Cluster;
use crate::SidecarClusterOwned;

/// Sidecar monitor context
struct Context {
Expand All @@ -27,9 +28,6 @@ struct Context {
pod_api: Api<Pod>,
}

/// A sidecar cluster states map
type SidecarClusterOwned<T> = HashMap<String, T>;

/// Sidecar monitor.
/// It monitors the communication of all sidecars, finds and tries to recover the dropped sidecar.
pub(crate) struct SidecarMonitor {
Expand Down Expand Up @@ -65,7 +63,7 @@ impl SidecarMonitor {

/// Run the state update task with graceful shutdown.
/// Return fatal error if run failed.
/// The task that update the state received from sidecar operators
/// The task that update the state received from sidecars
#[allow(clippy::integer_arithmetic)] // required by tokio::select
pub(crate) async fn run_with_graceful_shutdown(
self,
Expand All @@ -81,7 +79,7 @@ impl SidecarMonitor {
}
}

/// Inner task for state update, return the unrecoverable error
/// Task for state update, return the unrecoverable error
async fn state_update(mut self) -> Result<()> {
loop {
let status = self.ctx.status_rx.recv_async().await?;
Expand Down
Loading

0 comments on commit 65e1528

Please sign in to comment.