diff --git a/sidecar/src/controller.rs b/sidecar/src/controller.rs index abe583ce..2b5b358f 100644 --- a/sidecar/src/controller.rs +++ b/sidecar/src/controller.rs @@ -6,20 +6,22 @@ use std::time::Duration; use anyhow::Result; use tokio::select; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use tokio::time::{interval, MissedTickBehavior}; -use tracing::debug; +use tracing::{debug, error, info}; -use crate::types::StatePayload; +use crate::types::{MemberConfig, State, StatePayload, StateStatus}; use crate::xline::XlineHandle; /// Sidecar operator controller #[derive(Debug)] pub(crate) struct Controller { + /// The name of this sidecar + name: String, /// The state of this operator state: Arc>, /// Xline handle - handle: Arc, + handle: Arc>, /// Check interval reconcile_interval: Duration, } @@ -27,11 +29,13 @@ pub(crate) struct Controller { impl Controller { /// Constructor pub(crate) fn new( + name: String, state: Arc>, - handle: Arc, + handle: Arc>, reconcile_interval: Duration, ) -> Self { Self { + name, state, handle, reconcile_interval, @@ -42,26 +46,35 @@ impl Controller { #[allow(clippy::integer_arithmetic)] // this error originates in the macro `tokio::select` pub(crate) async fn run_reconcile_with_shutdown( self, + init_member_config: MemberConfig, graceful_shutdown: impl Future, ) -> Result<()> { select! { _ = graceful_shutdown => { Ok(()) } - res = self.run_reconcile() => { + res = self.run_reconcile(init_member_config) => { res } } } /// Run reconcile loop - pub(crate) async fn run_reconcile(self) -> Result<()> { + pub(crate) async fn run_reconcile(self, init_member_config: MemberConfig) -> Result<()> { let mut tick = interval(self.reconcile_interval); tick.set_missed_tick_behavior(MissedTickBehavior::Skip); + let member_config = init_member_config; + self.handle + .write() + .await + .start(&member_config.xline_members()) + .await?; loop { let instant = tick.tick().await; - let _result = self.evaluate().await; - let _result1 = self.execute().await; + if let Err(err) = self.reconcile_once(&member_config).await { + error!("reconcile failed, error: {err}"); + continue; + } debug!( "successfully reconcile the cluster states within {:?}", instant.elapsed() @@ -69,17 +82,80 @@ impl Controller { } } - /// Evaluate cluster states - #[allow(clippy::unused_async)] // TODO remove when it is implemented - async fn evaluate(&self) -> Result<()> { - // TODO evaluate states + /// Reconcile inner + async fn reconcile_once(&self, member_config: &MemberConfig) -> Result<()> { + let mut handle = self.handle.write().await; + + let sidecar_members = member_config.sidecar_members(); + let xline_members = member_config.xline_members(); + let cluster_size = member_config.members.len(); + let majority = member_config.majority_cnt(); + + let cluster_health = handle.is_healthy().await; + let xline_running = handle.is_running().await; + let states = StateStatus::gather(&sidecar_members).await?; + + match (cluster_health, xline_running) { + (true, true) => { + self.set_state(State::OK).await; + + info!("status: cluster healthy + xline running"); + } + (true, false) => { + self.set_state(State::Pending).await; + + info!("status: cluster healthy + xline not running, joining the cluster"); + handle.start(&xline_members).await?; + } + (false, true) => { + self.set_state(State::Pending).await; + + if states + .states + .get(&State::OK) + .is_some_and(|c| *c >= majority) + { + info!("status: cluster unhealthy + xline running + quorum ok, waiting..."); + } else { + info!( + "status: cluster unhealthy + xline running + quorum loss, backup and start failure recovery" + ); + handle.backup().await?; + handle.stop().await?; + } + } + (false, false) => { + let is_seeder = states.seeder == self.name; + if !is_seeder { + info!("status: cluster unhealthy + xline not running + not seeder, try to install backup"); + handle.install_backup().await?; + } + + self.set_state(State::Start).await; + + if states + .states + .get(&State::Start) + .is_some_and(|c| *c != cluster_size) + { + info!("status: cluster unhealthy + xline not running + wait all start"); + return Ok(()); + } + + if is_seeder { + info!( + "status: cluster unhealthy + xline not running + all start + seeder, seed cluster" + ); + handle.start(&xline_members).await?; + } + } + } + Ok(()) } - /// Execute reconciliation based on evaluation - #[allow(clippy::unused_async)] // TODO remove when it is implemented - async fn execute(&self) -> Result<()> { - // TODO execute reconciliation - Ok(()) + /// Set current state + async fn set_state(&self, state: State) { + self.state.lock().await.state = state; } } diff --git a/sidecar/src/routers.rs b/sidecar/src/routers.rs index a846f0e9..fdcb3261 100644 --- a/sidecar/src/routers.rs +++ b/sidecar/src/routers.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use axum::http::StatusCode; use axum::{Extension, Json}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use crate::types::{MembershipChange, StatePayload}; use crate::utils::{check_backup_volume, check_data_volume}; @@ -22,8 +22,8 @@ pub(crate) async fn health() -> StatusCode { } /// Backup hook -pub(crate) async fn backup(Extension(handle): Extension>) -> StatusCode { - if handle.backup().await.is_err() { +pub(crate) async fn backup(Extension(handle): Extension>>) -> StatusCode { + if handle.read().await.backup().await.is_err() { return StatusCode::INTERNAL_SERVER_ERROR; } StatusCode::OK diff --git a/sidecar/src/sidecar.rs b/sidecar/src/sidecar.rs index 575c8df5..0ed51955 100644 --- a/sidecar/src/sidecar.rs +++ b/sidecar/src/sidecar.rs @@ -8,7 +8,7 @@ use futures::{FutureExt, TryFutureExt}; use operator_api::consts::{SIDECAR_BACKUP_ROUTE, SIDECAR_HEALTH_ROUTE, SIDECAR_STATE_ROUTE}; use operator_api::{HeartbeatStatus, K8sXlineHandle, LocalXlineHandle}; use tokio::sync::watch::{Receiver, Sender}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use tokio::time::{interval, MissedTickBehavior}; use tracing::{debug, error, info, warn}; @@ -42,8 +42,8 @@ impl Sidecar { pub async fn run(&self) -> Result<()> { let (graceful_shutdown_event, _) = tokio::sync::watch::channel(()); let forceful_shutdown = self.forceful_shutdown(&graceful_shutdown_event); - let handle = Arc::new(self.init_xline_handle().await?); - let revision = handle.revision_offline().unwrap_or(1); + let handle = Arc::new(RwLock::new(self.init_xline_handle().await?)); + let revision = handle.read().await.revision_offline().unwrap_or(1); let state = Arc::new(Mutex::new(StatePayload { state: State::Start, revision, @@ -114,10 +114,10 @@ impl Sidecar { }; XlineHandle::open( &self.config.name, + &self.config.xline.data_dir, backup, inner, self.config.xline_port, - self.config.init_xline_members(), ) } @@ -173,15 +173,21 @@ impl Sidecar { /// Start controller fn start_controller( &self, - handle: Arc, + handle: Arc>, state: Arc>, graceful_shutdown: Receiver<()>, ) { - let controller = Controller::new(state, handle, self.config.reconcile_interval); + let controller = Controller::new( + self.config.name.clone(), + state, + handle, + self.config.reconcile_interval, + ); + let init_member_config = self.config.init_member_config(); let _ig = tokio::spawn(async move { let mut shutdown = graceful_shutdown; let res = controller - .run_reconcile_with_shutdown(shutdown.changed().map(|_| ())) + .run_reconcile_with_shutdown(init_member_config, shutdown.changed().map(|_| ())) .await; if let Err(err) = res { error!("controller run failed, error: {err}"); @@ -194,7 +200,7 @@ impl Sidecar { /// Run a web server to expose current state to other sidecar operators and k8s fn start_web_server( &self, - handle: Arc, + handle: Arc>, state: Arc>, graceful_shutdown: Receiver<()>, ) -> Result<()> { diff --git a/sidecar/src/types.rs b/sidecar/src/types.rs index 5d7a3326..61c33ef7 100644 --- a/sidecar/src/types.rs +++ b/sidecar/src/types.rs @@ -4,7 +4,7 @@ use operator_api::XlineConfig; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::ops::AddAssign; +use std::ops::{Add, AddAssign, Div}; use std::path::PathBuf; use std::time::Duration; @@ -16,7 +16,7 @@ pub struct Config { pub name: String, /// The cluster name pub cluster_name: String, - /// Sidecar initial hosts, [pod_name]->[pod_host] + /// Nodes initial hosts, [pod_name]->[pod_host] pub init_members: HashMap, /// The xline server port pub xline_port: u16, @@ -45,6 +45,17 @@ pub struct MonitorConfig { pub heartbeat_interval: Duration, } +/// Member config +#[derive(Debug, Clone)] +pub(crate) struct MemberConfig { + /// Nodes hosts, [pod_name]->[pod_host] + pub(crate) members: HashMap, + /// The xline server port + pub(crate) xline_port: u16, + /// The sidecar web server port + pub(crate) sidecar_port: u16, +} + /// Sidecar backend, it determinate how xline could be setup #[derive(Debug, Clone, PartialEq, Eq)] #[non_exhaustive] @@ -80,9 +91,7 @@ pub enum BackupConfig { impl Config { /// Get the initial sidecar members - #[must_use] - #[inline] - pub fn init_sidecar_members(&self) -> HashMap { + pub(crate) fn init_sidecar_members(&self) -> HashMap { self.init_members .clone() .into_iter() @@ -91,15 +100,48 @@ impl Config { } /// Get the initial xline members - #[must_use] - #[inline] - pub fn init_xline_members(&self) -> HashMap { + pub(crate) fn init_xline_members(&self) -> HashMap { self.init_members .clone() .into_iter() .map(|(name, host)| (name, format!("{host}:{}", self.xline_port))) .collect() } + + /// Get the initial member config + pub(crate) fn init_member_config(&self) -> MemberConfig { + MemberConfig { + members: self.init_members.clone(), + xline_port: self.xline_port, + sidecar_port: self.sidecar_port, + } + } +} + +impl MemberConfig { + /// Get the xline members + pub(crate) fn xline_members(&self) -> HashMap { + self.members + .clone() + .into_iter() + .map(|(name, host)| (name, format!("{host}:{}", self.xline_port))) + .collect() + } + + /// Get the sidecar members + pub(crate) fn sidecar_members(&self) -> HashMap { + self.members + .clone() + .into_iter() + .map(|(name, host)| (name, format!("{host}:{}", self.sidecar_port))) + .collect() + } + + /// Get the majority count + + pub(crate) fn majority_cnt(&self) -> usize { + self.members.len().div(2).add(1) + } } /// Sidecar operator state @@ -144,7 +186,7 @@ impl StateStatus { for (name, addr) in sidecars { let url = format!("http://{addr}{SIDECAR_STATE_ROUTE}"); let state: StatePayload = reqwest::get(url).await?.json().await?; - if state.revision > max_rev { + if (state.revision, name.as_str()) > (max_rev, seeder) { max_rev = state.revision; seeder = name; } diff --git a/sidecar/src/xline.rs b/sidecar/src/xline.rs index bddc185a..b8f403d3 100644 --- a/sidecar/src/xline.rs +++ b/sidecar/src/xline.rs @@ -4,20 +4,21 @@ use std::collections::HashMap; use std::fmt::Debug; +use std::io; +use std::path::{Path, PathBuf}; use std::time::Duration; use anyhow::{anyhow, Result}; use bytes::Buf; use engine::{Engine, EngineType, StorageEngine}; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use operator_api::consts::DEFAULT_DATA_DIR; use tonic::transport::{Channel, Endpoint}; use tonic_health::pb::health_check_response::ServingStatus; use tonic_health::pb::health_client::HealthClient; use tonic_health::pb::HealthCheckRequest; -use tracing::debug; -use xline_client::types::cluster::{MemberAddRequest, MemberListRequest, MemberRemoveRequest}; +use tracing::{debug, info}; +use xline_client::types::cluster::{ + MemberAddRequest, MemberListRequest, MemberRemoveRequest, MemberUpdateRequest, +}; use xline_client::types::kv::RangeRequest; use xline_client::{Client, ClientOptions}; @@ -52,6 +53,8 @@ pub(crate) const XLINE_TABLES: [&str; 6] = [ pub(crate) struct XlineHandle { /// The name of the operator name: String, + /// The xline data dir + data_dir: PathBuf, /// The xline backup provider backup: Option>, /// The xline client, used to connect to the cluster @@ -62,8 +65,6 @@ pub(crate) struct XlineHandle { server_id: Option, /// The rocks db engine engine: Engine, - /// The xline members - xline_members: HashMap, /// Health retires of xline client is_healthy_retries: usize, /// The detailed xline process handle @@ -74,24 +75,25 @@ impl XlineHandle { /// Create the xline handle but not start the xline node pub(crate) fn open( name: &str, + data_dir: &str, backup: Option>, inner: Box, xline_port: u16, - xline_members: HashMap, ) -> Result { debug!("name: {name}, backup: {backup:?}, xline_port: {xline_port}"); let endpoint: Endpoint = format!("http://127.0.0.1:{xline_port}").parse()?; let channel = Channel::balance_list(std::iter::once(endpoint)); let health_client = HealthClient::new(channel); - let engine = Engine::new(EngineType::Rocks(DEFAULT_DATA_DIR.parse()?), &XLINE_TABLES)?; + let data_path: PathBuf = data_dir.parse()?; + let engine = Engine::new(EngineType::Rocks(data_path.clone()), &XLINE_TABLES)?; Ok(Self { name: name.to_owned(), + data_dir: data_path, backup, health_client, engine, client: None, server_id: None, - xline_members, is_healthy_retries: 5, inner, }) @@ -104,49 +106,76 @@ impl XlineHandle { .unwrap_or_else(|| panic!("xline client not initialized")) } + /// Cleanup data directory + pub(crate) async fn cleanup(&self) -> Result<()> { + tokio::fs::remove_dir_all(&self.data_dir).await?; + Ok(()) + } + /// Start the xline server - pub(crate) async fn start(&mut self) -> Result<()> { + pub(crate) async fn start(&mut self, xlines: &HashMap) -> Result<()> { + /// Timeout for test start + const TEST_START_TIMEOUT: Duration = Duration::from_secs(3); + // TODO: hold a distributed lock during start // Step 1: Check if there is any node running // Step 2: If there is no node running, start single node cluster - // Step 3: If there are some nodes running, start the node as a member to join the cluster - let others = self - .xline_members - .iter() - .filter(|&(name, _)| name != &self.name) - .map(|(_, addr)| { - Ok::<_, tonic::transport::Error>( - Endpoint::from_shared(addr.clone())?.connect_timeout(Duration::from_secs(3)), - ) - }) - .collect::, _>>()?; - let futs: FuturesUnordered<_> = others.iter().map(Endpoint::connect).collect(); + // Step 3: If there are some nodes running, start the node as a member to join/update the cluster + + let self_addr = xlines + .get(&self.name) + .ok_or_else(|| anyhow!("self name should be in xline members"))? + .clone(); + let mut start_members = HashMap::from([(self.name.clone(), self_addr.clone())]); // the cluster is started if any of the connection is successful - let cluster_started = futs.any(|res| async move { res.is_ok() }).await; + let mut cluster_started = false; + + for (name, addr) in xlines.iter().filter(|&(name, _)| name != &self.name) { + let online = Endpoint::from_shared(addr.clone())? + .connect_timeout(TEST_START_TIMEOUT) + .connect() + .await + .is_ok(); + if online { + cluster_started = true; + let _ig = start_members.insert(name.clone(), addr.clone()); + } + } - self.inner.start(&self.xline_members).await?; + self.inner.start(&start_members).await?; - let client = Client::connect(self.xline_members.values(), ClientOptions::default()).await?; + let client = Client::connect(xlines.values(), ClientOptions::default()).await?; let mut cluster_client = client.cluster_client(); + + let mut members = cluster_client + .member_list(MemberListRequest::new(false)) + .await? + .members; + let member = if cluster_started { - let peer_addr = self - .xline_members - .get(&self.name) - .unwrap_or_else(|| unreachable!("member should contain self")) - .clone(); - let resp = cluster_client - .member_add(MemberAddRequest::new(vec![peer_addr], false)) - .await?; - let Some(member) = resp.member else { - unreachable!("self member should be set when member add request success") - }; - member + let joined = members.iter().find(|mem| mem.name == self.name); + if let Some(old_member) = joined { + let _ig = cluster_client + .member_update(MemberUpdateRequest::new( + old_member.id, + vec![self_addr.clone()], + )) + .await?; + let mut new_member = old_member.clone(); + new_member.peer_ur_ls = vec![self_addr.clone()]; + new_member.client_ur_ls = vec![self_addr.clone()]; + new_member + } else { + let resp = cluster_client + .member_add(MemberAddRequest::new(vec![self_addr.clone()], false)) + .await?; + let Some(member) = resp.member else { + unreachable!("self member should be set when member add request success") + }; + member + } } else { - let mut members = cluster_client - .member_list(MemberListRequest::new(false)) - .await? - .members; if members.len() != 1 { return Err(anyhow!( "there should be only one member(self) if the cluster if not start" @@ -169,6 +198,7 @@ impl XlineHandle { .take() .ok_or_else(|| anyhow!("xline server should not be stopped before started"))?; + // double check for cluster health if self.is_healthy().await { let mut cluster_client = self.client().cluster_client(); _ = cluster_client @@ -253,7 +283,10 @@ impl XlineHandle { // If the local revision is less than remote, abort backup // Step 3. Start backup let backup = match self.backup.as_ref() { - None => return Err(anyhow!("no backup specified")), + None => { + info!("no backup config found, skip backup"); + return Ok(()); + } Some(backup) => backup, }; let remote = backup.latest().await?.map(|metadata| metadata.revision); @@ -280,4 +313,50 @@ impl XlineHandle { .await?; Ok(()) } + + /// Install backup, make sure that xline is shutdown + pub(crate) async fn install_backup(&self) -> Result<()> { + let backup = match self.backup.as_ref() { + None => { + info!("no backup config found, skip install backup"); + return Ok(()); + } + Some(backup) => backup, + }; + let Some(latest) = backup.latest().await? else { + info!("no backup found, skip install backup"); + return Ok(()) + }; + if !self.data_dir.exists() { + debug!("data directory not found, install backup"); + let local = backup.load(&latest).await?; + copy_recursively(&local, &self.data_dir)?; + tokio::fs::remove_dir_all(local).await?; + return Ok(()); + } + if latest.revision <= self.revision_offline()? { + info!("remote revision is less than local, skip install backup"); + return Ok(()); + } + tokio::fs::remove_dir_all(&self.data_dir).await?; + let local = backup.load(&latest).await?; + copy_recursively(&local, &self.data_dir)?; + tokio::fs::remove_dir_all(local).await?; + Ok(()) + } +} + +/// Copy directory +fn copy_recursively(source: impl AsRef, destination: impl AsRef) -> io::Result<()> { + std::fs::create_dir_all(&destination)?; + for entry in std::fs::read_dir(source)? { + let entry = entry?; + let filetype = entry.file_type()?; + if filetype.is_dir() { + copy_recursively(entry.path(), destination.as_ref().join(entry.file_name()))?; + } else { + let _ig = std::fs::copy(entry.path(), destination.as_ref().join(entry.file_name()))?; + } + } + Ok(()) }