From d94d973d2475dc8eb1ca7c1238b2bc77919b8282 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 26 Sep 2023 17:21:06 +0800 Subject: [PATCH] feat: implement xline handle Signed-off-by: iGxnon --- Cargo.lock | 6 +- operator-api/Cargo.toml | 1 + operator-api/src/lib.rs | 2 + operator-api/src/xline.rs | 118 ++++++++++++++++++++--- sidecar/Cargo.toml | 1 + sidecar/src/main.rs | 197 ++++++++++++++++++++++++++++++++++---- sidecar/src/operator.rs | 48 +++++++--- sidecar/src/types.rs | 21 +++- sidecar/src/xline.rs | 19 ++-- 9 files changed, 353 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 958710ee..dae03277 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,9 +115,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.71" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" [[package]] name = "async-channel" @@ -2084,6 +2084,7 @@ dependencies = [ name = "operator-api" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "k8s-openapi", "kube", @@ -3799,6 +3800,7 @@ dependencies = [ "engine", "event-listener", "futures", + "operator-api", "serde", "serde_json", "thiserror", diff --git a/operator-api/Cargo.toml b/operator-api/Cargo.toml index a94f14e7..af23fc4e 100644 --- a/operator-api/Cargo.toml +++ b/operator-api/Cargo.toml @@ -15,3 +15,4 @@ async-trait = "0.1.72" k8s-openapi = { version = "0.18.0", features = ["v1_26", "schemars"] } kube = { version = "0.83.0", features = ["runtime", "derive", "ws"] } serde = { version = "1.0.130", features = ["derive"] } +anyhow = "1.0.72" diff --git a/operator-api/src/lib.rs b/operator-api/src/lib.rs index 16701eed..fceef081 100644 --- a/operator-api/src/lib.rs +++ b/operator-api/src/lib.rs @@ -3,6 +3,8 @@ mod xline; pub mod consts; +pub use xline::{K8sXlineHandle, LocalXlineHandle, XlineHandle}; + use serde::{Deserialize, Serialize}; /// Heartbeat status diff --git a/operator-api/src/xline.rs b/operator-api/src/xline.rs index 2f0bf515..11a161ef 100644 --- a/operator-api/src/xline.rs +++ b/operator-api/src/xline.rs @@ -2,21 +2,58 @@ use async_trait::async_trait; use k8s_openapi::api::core::v1::Pod; use kube::api::{AttachParams, AttachedProcess}; use kube::Api; +use std::fmt::{Debug, Formatter}; + +use std::process::{Child, Command}; /// xline handle abstraction #[async_trait] -pub trait XlineHandle { - /// the err during start and kill - type Err; - +pub trait XlineHandle: Debug + Send + Sync + 'static { /// start a xline node - async fn start(&mut self) -> Result<(), Self::Err>; + async fn start(&mut self) -> anyhow::Result<()>; // we truly dont care about what failure happened when start, it just failed /// kill a xline node - async fn kill(&mut self) -> Result<(), Self::Err>; + async fn kill(&mut self) -> anyhow::Result<()>; +} + +/// Local xline handle, it will execute the xline in the local +/// machine with the start_cmd +#[derive(Debug)] +pub struct LocalXlineHandle { + start_cmd: String, + child_proc: Option, +} + +impl LocalXlineHandle { + /// New a local xline handle + pub fn new(start_cmd: String) -> Self { + Self { + start_cmd, + child_proc: None, + } + } +} + +#[async_trait] +impl XlineHandle for LocalXlineHandle { + async fn start(&mut self) -> anyhow::Result<()> { + if let Some((exe, args)) = self.start_cmd.split_once(' ') { + let proc = Command::new(exe).args(args.split(' ')).spawn()?; + self.child_proc = Some(proc); + } + unreachable!("the start_cmd must be valid"); + } + + async fn kill(&mut self) -> anyhow::Result<()> { + if let Some(mut proc) = self.child_proc.take() { + return Ok(proc.kill()?); + } + Ok(()) + } } -/// K8s xline handle +/// K8s xline handle, it will execute the xline start_cmd +/// in pod pub struct K8sXlineHandle { /// the pod name pod_name: String, @@ -26,26 +63,79 @@ pub struct K8sXlineHandle { pods_api: Api, /// the attached process of xline process: Option, + /// the xline start cmd, parameters are split by ' ' + start_cmd: String, +} + +impl Debug for K8sXlineHandle { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("K8sXlineHandle") + .field("pod_name", &self.pod_name) + .field("container_name", &self.container_name) + .field("pods_api", &self.pods_api) + .field("start_cmd", &self.start_cmd) + .finish() + } +} + +impl K8sXlineHandle { + /// New with default k8s client + pub async fn new_with_default( + pod_name: String, + container_name: String, + namespace: &str, + start_cmd: String, + ) -> Self { + let client = kube::Client::try_default() + .await + .unwrap_or_else(|_ig| unreachable!("it must be setup in k8s environment")); + Self { + pod_name, + container_name, + pods_api: Api::namespaced(client, namespace), + process: None, + start_cmd, + } + } + + /// New with the provided k8s client + pub fn new_with_client( + pod_name: String, + container_name: String, + client: kube::Client, + namespace: &str, + start_cmd: String, + ) -> Self { + Self { + pod_name, + container_name, + pods_api: Api::namespaced(client, namespace), + process: None, + start_cmd, + } + } } #[async_trait] impl XlineHandle for K8sXlineHandle { - type Err = kube::Error; - - async fn start(&mut self) -> Result<(), Self::Err> { + async fn start(&mut self) -> anyhow::Result<()> { + let start_cmd: Vec<&str> = self.start_cmd.split(' ').collect(); let process = self .pods_api .exec( &self.pod_name, - vec!["sh"], + start_cmd, &AttachParams::default().container(&self.container_name), ) .await?; self.process = Some(process); - todo!() + Ok(()) } - async fn kill(&mut self) -> Result<(), Self::Err> { - todo!() + async fn kill(&mut self) -> anyhow::Result<()> { + if let Some(process) = self.process.take() { + process.abort(); + } + Ok(()) } } diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index c14ae129..40d40a6e 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -38,3 +38,4 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } uuid = { version = "1.4.1", features = ["v4"] } xline-client = { git = "https://github.com/xline-kv/Xline.git", package = "xline-client" } xlineapi = { git = "https://github.com/xline-kv/Xline.git", package = "xlineapi" } +operator-api = { path = "../operator-api" } diff --git a/sidecar/src/main.rs b/sidecar/src/main.rs index 94ea3c0e..256c9bb0 100644 --- a/sidecar/src/main.rs +++ b/sidecar/src/main.rs @@ -138,20 +138,20 @@ use std::collections::HashMap; use std::path::PathBuf; -use anyhow::{anyhow, Result}; +use anyhow::Result; use clap::Parser; use tracing::debug; use xline_sidecar::operator::Operator; -use xline_sidecar::types::{Backup, Config}; +use xline_sidecar::types::{Backend, Backup, Config}; /// Command line interface #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Cli { - /// The name of this node + /// The name of this sidecar, and is shared with xline node name #[arg(long)] - name: String, // used in xline and deployment operator to identify this node + name: String, /// The host ip of each member, [node_name] -> [node_host] #[arg(long, value_parser = parse_members)] members: HashMap, @@ -161,9 +161,6 @@ struct Cli { /// Operator web server port #[arg(long)] operator_port: u16, - /// The xline container name - #[arg(long)] - container_name: String, /// Check health interval, default 20 [unit: seconds] #[arg(long, default_value = "20")] check_interval: u64, @@ -186,6 +183,12 @@ struct Cli { /// e.g "--jaeger_offline true" #[arg(long)] additional: Option, + /// The sidecar backend, when you use different operators, the backend may different. + /// e.g: + /// "k8s,pod=xline-pod-1,container=xline,namespace=default" for k8s backend + /// "local" for local backend + #[arg(long, value_parser=parse_backend)] + backend: Backend, } impl From for Config { @@ -193,12 +196,12 @@ impl From for Config { let mut config = Self { start_cmd: String::new(), name: value.name.clone(), - container_name: value.container_name, xline_port: value.xline_port, operator_port: value.operator_port, check_interval: std::time::Duration::from_secs(value.check_interval), backup: value.backup, members: value.members, + backend: value.backend, }; config.start_cmd = format!( "{} --name {} --members {} --storage-engine {} --data-dir {}", @@ -228,11 +231,10 @@ impl From for Config { /// parse backup type fn parse_backup_type(value: &str) -> Result { - debug!("parse backup type: {}", value); - let mut items: Vec<_> = value.split([':', ' ', ',', '-']).collect(); - if items.is_empty() { + if value.is_empty() { return Err("backup type is empty".to_owned()); } + let mut items: Vec<_> = value.split([':', ' ', ',', '-']).collect(); let backup_type = items.remove(0); match backup_type { "s3" => { @@ -261,19 +263,55 @@ fn parse_backup_type(value: &str) -> Result { } } +/// Parse backend +fn parse_backend(value: &str) -> Result { + if value.is_empty() { + return Err("backend is empty".to_owned()); + } + let mut items: Vec<_> = value.split(',').collect(); + let backend = items.remove(0); + match backend { + "k8s" => { + let mut pod_name = String::new(); + let mut container_name = String::new(); + let mut namespace = String::new(); + while let Some(item) = items.pop() { + let Some((k, v)) = item.split_once('=') else { + return Err(format!("k8s backend got unexpected argument {item}, expect =")); + }; + match k { + "pod" => pod_name = v.to_owned(), + "container" => container_name = v.to_owned(), + "namespace" => namespace = v.to_owned(), + _ => return Err(format!("k8s backend got unexpect argument {item}, expect one of 'pod', 'container', 'namespace'")), + } + } + if pod_name.is_empty() || container_name.is_empty() || namespace.is_empty() { + return Err("k8s backend must set 'pod', 'container', 'namespace'".to_owned()); + } + Ok(Backend::K8s { + pod_name, + container_name, + namespace, + }) + } + "local" => Ok(Backend::Local), + _ => Err(format!("unknown backend: {backend}")), + } +} + /// parse members from string /// # Errors /// Return error when pass wrong args #[inline] -pub fn parse_members(s: &str) -> Result> { +pub fn parse_members(s: &str) -> Result, String> { let mut map = HashMap::new(); for pair in s.split(',') { if let Some((id, addr)) = pair.split_once('=') { let _ignore = map.insert(id.to_owned(), addr.to_owned()); } else { - return Err(anyhow!( - "parse the pair '{}' error, expect '='", - pair + return Err(format!( + "parse the pair '{pair}' error, expect '='", )); } } @@ -292,10 +330,11 @@ async fn main() -> Result<()> { #[cfg(test)] mod test { - use crate::Cli; + use crate::{parse_backend, parse_backup_type, parse_members, Cli}; use clap::Parser; use std::collections::HashMap; - use xline_sidecar::types::{Backup, Config}; + use std::path::PathBuf; + use xline_sidecar::types::{Backend, Backup, Config}; fn full_parameter() -> Vec<&'static str> { vec![ @@ -304,7 +343,6 @@ mod test { "--members=node1=127.0.0.1", "--xline-port=2379", "--operator-port=2380", - "--container-name=xline", "--check-interval=60", "--backup=s3:bucket_name", "--xline-executable=/usr/local/bin/xline", @@ -312,9 +350,123 @@ mod test { "--data-dir=/usr/local/xline/data-dir", "--is-leader", "--additional='--auth-public-key /mnt/public.pem --auth-private-key /mnt/private.pem'", + "--backend=k8s,pod=xline-pod-1,container=xline,namespace=default", ] } + #[test] + fn test_parse_backup_type() { + let test_cases = [ + ("", Err("backup type is empty".to_owned())), + ( + "s3:bucket_name", + Ok(Backup::S3 { + bucket: "bucket_name".to_owned(), + }), + ), + ( + "s3:bucket:name", + Err("s3 backup type requires 1 arguments, got 2".to_owned()), + ), + ( + "s3", + Err("s3 backup type requires 1 arguments, got 0".to_owned()), + ), + ( + "pv:/home", + Ok(Backup::PV { + path: PathBuf::from("/home"), + }), + ), + ( + "pv:/home:/paopao", + Err("pv backup type requires 1 argument, got 2".to_owned()), + ), + ( + "pv", + Err("pv backup type requires 1 argument, got 0".to_owned()), + ), + ( + "_invalid_", + Err("unknown backup type: _invalid_".to_owned()), + ), + ]; + for (test_case, res) in test_cases { + assert_eq!(parse_backup_type(test_case), res); + } + } + + #[test] + fn test_parse_backend() { + let test_cases = [ + ( + "k8s,pod=my-pod,container=my-container,namespace=my-namespace", + Ok(Backend::K8s { + pod_name: "my-pod".to_owned(), + container_name: "my-container".to_owned(), + namespace: "my-namespace".to_owned(), + }), + ), + ("local", Ok(Backend::Local)), + ("", Err("backend is empty".to_owned())), + ( + "k8s,pod=my-pod,invalid-arg,namespace=my-namespace", + Err( + "k8s backend got unexpected argument invalid-arg, expect =" + .to_owned(), + ), + ), + ( + "k8s,pod=my-pod,container=my-container", + Err("k8s backend must set 'pod', 'container', 'namespace'".to_owned()), + ), + ( + "unknown-backend", + Err("unknown backend: unknown-backend".to_owned()), + ), + ]; + for (input, expected) in test_cases { + let result = parse_backend(input); + assert_eq!(result, expected); + } + } + + #[test] + fn test_parse_members() { + let test_cases = vec![ + ( + "id1=addr1,id2=addr2,id3=addr3", + Ok([("id1", "addr1"), ("id2", "addr2"), ("id3", "addr3")] + .iter() + .map(|&(id, addr)| (id.to_owned(), addr.to_owned())) + .collect()), + ), + ( + "id1=addr1", + Ok(std::iter::once(&("id1", "addr1")) + .map(|&(id, addr)| (id.to_owned(), addr.to_owned())) + .collect()), + ), + ( + "", + Err("parse the pair '' error, expect '='".to_owned()), + ), + ( + "id1=addr1,id2", + Err("parse the pair 'id2' error, expect '='".to_owned()), + ), + ( + "id1=addr1,id2=addr2,", + Err("parse the pair '' error, expect '='".to_owned()), + ), + ]; + + for (input, expected) in test_cases { + let result = parse_members(input); + assert_eq!(result, expected); + } + } + #[test] fn test_parse_cli_should_success() { let cli = Cli::parse_from(full_parameter()); @@ -325,7 +477,6 @@ mod test { ); assert_eq!(cli.xline_port, 2379); assert_eq!(cli.operator_port, 2380); - assert_eq!(cli.container_name, "xline"); assert_eq!(cli.check_interval, 60); assert_eq!( cli.backup, @@ -344,6 +495,14 @@ mod test { .to_owned() ) ); + assert_eq!( + cli.backend, + Backend::K8s { + pod_name: "xline-pod-1".to_owned(), + container_name: "xline".to_owned(), + namespace: "default".to_owned(), + } + ); } #[test] diff --git a/sidecar/src/operator.rs b/sidecar/src/operator.rs index e71cd3ac..4d157e39 100644 --- a/sidecar/src/operator.rs +++ b/sidecar/src/operator.rs @@ -5,6 +5,7 @@ use anyhow::{anyhow, Result}; use axum::routing::{get, post}; use axum::{Extension, Router}; use futures::{FutureExt, TryFutureExt}; +use operator_api::{K8sXlineHandle, LocalXlineHandle}; use tokio::sync::watch::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::time::interval; @@ -15,7 +16,7 @@ use crate::backup::Provider; use crate::controller::Controller; use crate::controller::Error; use crate::routers; -use crate::types::{Backup, Config, State, StatePayload}; +use crate::types::{Backend, Backup, Config, State, StatePayload}; use crate::xline::XlineHandle; /// Sidecar operator @@ -41,14 +42,7 @@ impl Operator { pub async fn run(&self) -> Result<()> { let (graceful_shutdown_event, _) = tokio::sync::watch::channel(()); let forceful_shutdown = self.forceful_shutdown(&graceful_shutdown_event); - let backup = self.init_backup(); - let handle = Arc::new(XlineHandle::open( - &self.config.name, - &self.config.container_name, - backup, - self.config.xline_port, - self.config.xline_members(), - )?); + let handle = Arc::new(self.init_xline_handle().await?); let revision = handle.revision_offline().unwrap_or(1); let state = Arc::new(Mutex::new(StatePayload { state: State::Start, @@ -82,9 +76,10 @@ impl Operator { Ok(()) } - /// Initialize backup - fn init_backup(&self) -> Option> { - self.config + /// Initialize xline handle + async fn init_xline_handle(&self) -> Result { + let backup = self + .config .backup .as_ref() .and_then(|backup| match *backup { @@ -95,7 +90,34 @@ impl Operator { }); Some(pv) } - }) + }); + let inner: Box = match self.config.backend.clone() { + Backend::K8s { + pod_name, + container_name, + namespace, + } => { + let handle = K8sXlineHandle::new_with_default( + pod_name, + container_name, + &namespace, + self.config.start_cmd.clone(), + ) + .await; + Box::new(handle) + } + Backend::Local => { + let handle = LocalXlineHandle::new(self.config.start_cmd.clone()); + Box::new(handle) + } + }; + XlineHandle::open( + &self.config.name, + backup, + inner, + self.config.xline_port, + self.config.xline_members(), + ) } /// Forceful shutdown diff --git a/sidecar/src/types.rs b/sidecar/src/types.rs index 87887f85..44f2e162 100644 --- a/sidecar/src/types.rs +++ b/sidecar/src/types.rs @@ -11,8 +11,6 @@ use std::time::Duration; pub struct Config { /// Name of this node pub name: String, - /// Xline container name - pub container_name: String, /// The xline server port pub xline_port: u16, /// The operator web server port @@ -25,6 +23,25 @@ pub struct Config { pub members: HashMap, /// The xline start cmd pub start_cmd: String, + /// The backend + pub backend: Backend, +} + +/// Sidecar backend, it determinate how xline could be setup +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum Backend { + /// K8s backend + K8s { + /// The pod name of this node + pod_name: String, + /// The xline container name, used to attach on it + container_name: String, + /// The namespace of this node + namespace: String, + }, + /// Local backend + Local, } /// Backup storage config diff --git a/sidecar/src/xline.rs b/sidecar/src/xline.rs index 30b1b07a..6e38a7eb 100644 --- a/sidecar/src/xline.rs +++ b/sidecar/src/xline.rs @@ -3,6 +3,7 @@ #![allow(clippy::unused_self)] // TODO remove as it is implemented use std::collections::HashMap; +use std::fmt::Debug; use std::time::Duration; use anyhow::{anyhow, Result}; @@ -50,8 +51,6 @@ pub(crate) const XLINE_TABLES: [&str; 6] = [ pub(crate) struct XlineHandle { /// The name of the operator name: String, - /// The xline container name in the pod - container_name: String, /// The xline backup provider backup: Option>, /// The xline client @@ -64,31 +63,33 @@ pub(crate) struct XlineHandle { xline_members: HashMap, /// Health retires of xline client is_healthy_retries: usize, + /// The detailed xline process handle + inner: Box, } impl XlineHandle { /// Create the xline handle but not start the xline node pub(crate) fn open( name: &str, - container_name: &str, backup: Option>, + inner: Box, xline_port: u16, xline_members: HashMap, ) -> Result { - debug!("name: {name}, container_name: {container_name}, backup: {backup:?}, xline_port: {xline_port}"); + debug!("name: {name}, backup: {backup:?}, xline_port: {xline_port}"); let endpoint: Endpoint = format!("http://127.0.0.1:{xline_port}").parse()?; let channel = Channel::balance_list(std::iter::once(endpoint)); let health_client = HealthClient::new(channel); let engine = Engine::new(EngineType::Rocks(DEFAULT_DATA_DIR.parse()?), &XLINE_TABLES)?; Ok(Self { name: name.to_owned(), - container_name: container_name.to_owned(), backup, health_client, engine, client: None, // TODO maybe we could initialize the client here when xline#423 is merged xline_members, is_healthy_retries: 5, + inner, }) } @@ -117,8 +118,7 @@ impl XlineHandle { // the cluster is started if any of the connection is successful let cluster_started = futs.any(|res| async move { res.is_ok() }).await; - // start xline server here - // TODO define a trait to abstract the start of xline server + self.inner.start().await?; let client = Client::connect(self.xline_members.values(), ClientOptions::default()).await?; if cluster_started { @@ -130,11 +130,10 @@ impl XlineHandle { } /// Stop the xline server - pub(crate) async fn stop(&self) -> Result<()> { + pub(crate) async fn stop(&mut self) -> Result<()> { // Step 1: Kill the xline node // Step 2: Remove the xline node from the cluster if the cluster exist - - // kill xline server here + self.inner.kill().await?; if self.is_healthy().await { let _cluster_client = self.client().cluster_client();