Skip to content

Commit

Permalink
feat: implement reconcile and evaluate
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 27, 2023
1 parent c025e6d commit 23a7b2f
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 81 deletions.
112 changes: 94 additions & 18 deletions sidecar/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,36 @@ use std::time::Duration;

use anyhow::Result;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use tokio::time::{interval, MissedTickBehavior};
use tracing::debug;
use tracing::{debug, error, info};

use crate::types::StatePayload;
use crate::types::{MemberConfig, State, StatePayload, StateStatus};
use crate::xline::XlineHandle;

/// Sidecar operator controller
#[derive(Debug)]
pub(crate) struct Controller {
/// The name of this sidecar
name: String,
/// The state of this operator
state: Arc<Mutex<StatePayload>>,
/// Xline handle
handle: Arc<XlineHandle>,
handle: Arc<RwLock<XlineHandle>>,
/// Check interval
reconcile_interval: Duration,
}

impl Controller {
/// Constructor
pub(crate) fn new(
name: String,
state: Arc<Mutex<StatePayload>>,
handle: Arc<XlineHandle>,
handle: Arc<RwLock<XlineHandle>>,
reconcile_interval: Duration,
) -> Self {
Self {
name,
state,
handle,
reconcile_interval,
Expand All @@ -42,44 +46,116 @@ impl Controller {
#[allow(clippy::integer_arithmetic)] // this error originates in the macro `tokio::select`
pub(crate) async fn run_reconcile_with_shutdown(
self,
init_member_config: MemberConfig,
graceful_shutdown: impl Future<Output = ()>,
) -> Result<()> {
select! {
_ = graceful_shutdown => {
Ok(())
}
res = self.run_reconcile() => {
res = self.run_reconcile(init_member_config) => {
res
}
}
}

/// Run reconcile loop
pub(crate) async fn run_reconcile(self) -> Result<()> {
pub(crate) async fn run_reconcile(self, init_member_config: MemberConfig) -> Result<()> {
let mut tick = interval(self.reconcile_interval);
tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
let member_config = init_member_config;
self.handle
.write()
.await
.start(&member_config.xline_members())
.await?;
loop {
let instant = tick.tick().await;
let _result = self.evaluate().await;
let _result1 = self.execute().await;
if let Err(err) = self.reconcile_once(&member_config).await {
error!("reconcile failed, error: {err}");
continue;
}
debug!(
"successfully reconcile the cluster states within {:?}",
instant.elapsed()
);
}
}

/// Evaluate cluster states
#[allow(clippy::unused_async)] // TODO remove when it is implemented
async fn evaluate(&self) -> Result<()> {
// TODO evaluate states
/// Reconcile inner
async fn reconcile_once(&self, member_config: &MemberConfig) -> Result<()> {
let mut handle = self.handle.write().await;

let sidecar_members = member_config.sidecar_members();
let xline_members = member_config.xline_members();
let cluster_size = member_config.members.len();
let majority = member_config.majority_cnt();

let cluster_health = handle.is_healthy().await;
let xline_running = handle.is_running().await;
let states = StateStatus::gather(&sidecar_members).await?;

match (cluster_health, xline_running) {
(true, true) => {
self.set_state(State::OK).await;

info!("status: cluster healthy + xline running");
}
(true, false) => {
self.set_state(State::Pending).await;

info!("status: cluster healthy + xline not running, joining the cluster");
handle.start(&xline_members).await?;
}
(false, true) => {
self.set_state(State::Pending).await;

if states
.states
.get(&State::OK)
.is_some_and(|c| *c >= majority)
{
info!("status: cluster unhealthy + xline running + quorum ok, waiting...");
} else {
info!(
"status: cluster unhealthy + xline running + quorum loss, backup and start failure recovery"
);
handle.backup().await?;
handle.stop().await?;
}
}
(false, false) => {
let is_seeder = states.seeder == self.name;
if !is_seeder {
info!("status: cluster unhealthy + xline not running + not seeder, try to install backup");
handle.install_backup().await?;
}

self.set_state(State::Start).await;

if states
.states
.get(&State::Start)
.is_some_and(|c| *c != cluster_size)
{
info!("status: cluster unhealthy + xline not running + wait all start");
return Ok(());
}

if is_seeder {
info!(
"status: cluster unhealthy + xline not running + all start + seeder, seed cluster"
);
handle.start(&xline_members).await?;
}
}
}

Ok(())
}

/// Execute reconciliation based on evaluation
#[allow(clippy::unused_async)] // TODO remove when it is implemented
async fn execute(&self) -> Result<()> {
// TODO execute reconciliation
Ok(())
/// Set current state
async fn set_state(&self, state: State) {
self.state.lock().await.state = state;
}
}
6 changes: 3 additions & 3 deletions sidecar/src/routers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use axum::http::StatusCode;
use axum::{Extension, Json};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};

use crate::types::{MembershipChange, StatePayload};
use crate::utils::{check_backup_volume, check_data_volume};
Expand All @@ -22,8 +22,8 @@ pub(crate) async fn health() -> StatusCode {
}

/// Backup hook
pub(crate) async fn backup(Extension(handle): Extension<Arc<XlineHandle>>) -> StatusCode {
if handle.backup().await.is_err() {
pub(crate) async fn backup(Extension(handle): Extension<Arc<RwLock<XlineHandle>>>) -> StatusCode {
if handle.read().await.backup().await.is_err() {
return StatusCode::INTERNAL_SERVER_ERROR;
}
StatusCode::OK
Expand Down
22 changes: 14 additions & 8 deletions sidecar/src/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::{FutureExt, TryFutureExt};
use operator_api::consts::{SIDECAR_BACKUP_ROUTE, SIDECAR_HEALTH_ROUTE, SIDECAR_STATE_ROUTE};
use operator_api::{HeartbeatStatus, K8sXlineHandle, LocalXlineHandle};
use tokio::sync::watch::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use tokio::time::{interval, MissedTickBehavior};
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -42,8 +42,8 @@ impl Sidecar {
pub async fn run(&self) -> Result<()> {
let (graceful_shutdown_event, _) = tokio::sync::watch::channel(());
let forceful_shutdown = self.forceful_shutdown(&graceful_shutdown_event);
let handle = Arc::new(self.init_xline_handle().await?);
let revision = handle.revision_offline().unwrap_or(1);
let handle = Arc::new(RwLock::new(self.init_xline_handle().await?));
let revision = handle.read().await.revision_offline().unwrap_or(1);
let state = Arc::new(Mutex::new(StatePayload {
state: State::Start,
revision,
Expand Down Expand Up @@ -114,10 +114,10 @@ impl Sidecar {
};
XlineHandle::open(
&self.config.name,
&self.config.xline.data_dir,
backup,
inner,
self.config.xline_port,
self.config.init_xline_members(),
)
}

Expand Down Expand Up @@ -173,15 +173,21 @@ impl Sidecar {
/// Start controller
fn start_controller(
&self,
handle: Arc<XlineHandle>,
handle: Arc<RwLock<XlineHandle>>,
state: Arc<Mutex<StatePayload>>,
graceful_shutdown: Receiver<()>,
) {
let controller = Controller::new(state, handle, self.config.reconcile_interval);
let controller = Controller::new(
self.config.name.clone(),
state,
handle,
self.config.reconcile_interval,
);
let init_member_config = self.config.init_member_config();
let _ig = tokio::spawn(async move {
let mut shutdown = graceful_shutdown;
let res = controller
.run_reconcile_with_shutdown(shutdown.changed().map(|_| ()))
.run_reconcile_with_shutdown(init_member_config, shutdown.changed().map(|_| ()))
.await;
if let Err(err) = res {
error!("controller run failed, error: {err}");
Expand All @@ -194,7 +200,7 @@ impl Sidecar {
/// Run a web server to expose current state to other sidecar operators and k8s
fn start_web_server(
&self,
handle: Arc<XlineHandle>,
handle: Arc<RwLock<XlineHandle>>,
state: Arc<Mutex<StatePayload>>,
graceful_shutdown: Receiver<()>,
) -> Result<()> {
Expand Down
60 changes: 51 additions & 9 deletions sidecar/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use operator_api::XlineConfig;
use serde::{Deserialize, Serialize};

use std::collections::HashMap;
use std::ops::AddAssign;
use std::ops::{Add, AddAssign, Div};
use std::path::PathBuf;
use std::time::Duration;

Expand All @@ -16,7 +16,7 @@ pub struct Config {
pub name: String,
/// The cluster name
pub cluster_name: String,
/// Sidecar initial hosts, [pod_name]->[pod_host]
/// Nodes initial hosts, [pod_name]->[pod_host]
pub init_members: HashMap<String, String>,
/// The xline server port
pub xline_port: u16,
Expand Down Expand Up @@ -45,6 +45,17 @@ pub struct MonitorConfig {
pub heartbeat_interval: Duration,
}

/// Member config
#[derive(Debug, Clone)]
pub(crate) struct MemberConfig {
/// Nodes hosts, [pod_name]->[pod_host]
pub(crate) members: HashMap<String, String>,
/// The xline server port
pub(crate) xline_port: u16,
/// The sidecar web server port
pub(crate) sidecar_port: u16,
}

/// Sidecar backend, it determinate how xline could be setup
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
Expand Down Expand Up @@ -80,9 +91,7 @@ pub enum BackupConfig {

impl Config {
/// Get the initial sidecar members
#[must_use]
#[inline]
pub fn init_sidecar_members(&self) -> HashMap<String, String> {
pub(crate) fn init_sidecar_members(&self) -> HashMap<String, String> {
self.init_members
.clone()
.into_iter()
Expand All @@ -91,15 +100,48 @@ impl Config {
}

/// Get the initial xline members
#[must_use]
#[inline]
pub fn init_xline_members(&self) -> HashMap<String, String> {
pub(crate) fn init_xline_members(&self) -> HashMap<String, String> {
self.init_members
.clone()
.into_iter()
.map(|(name, host)| (name, format!("{host}:{}", self.xline_port)))
.collect()
}

/// Get the initial member config
pub(crate) fn init_member_config(&self) -> MemberConfig {
MemberConfig {
members: self.init_members.clone(),
xline_port: self.xline_port,
sidecar_port: self.sidecar_port,
}
}
}

impl MemberConfig {
/// Get the xline members
pub(crate) fn xline_members(&self) -> HashMap<String, String> {
self.members
.clone()
.into_iter()
.map(|(name, host)| (name, format!("{host}:{}", self.xline_port)))
.collect()
}

/// Get the sidecar members
pub(crate) fn sidecar_members(&self) -> HashMap<String, String> {
self.members
.clone()
.into_iter()
.map(|(name, host)| (name, format!("{host}:{}", self.sidecar_port)))
.collect()
}

/// Get the majority count
pub(crate) fn majority_cnt(&self) -> usize {
self.members.len().div(2).add(1)
}
}

/// Sidecar operator state
Expand Down Expand Up @@ -144,7 +186,7 @@ impl StateStatus {
for (name, addr) in sidecars {
let url = format!("http://{addr}{SIDECAR_STATE_ROUTE}");
let state: StatePayload = reqwest::get(url).await?.json().await?;
if state.revision > max_rev {
if (state.revision, name.as_str()) > (max_rev, seeder) {
max_rev = state.revision;
seeder = name;
}
Expand Down
Loading

0 comments on commit 23a7b2f

Please sign in to comment.