Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/Config discovery #37

Closed
wants to merge 18 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: implement heartbeat in sidecar
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 25, 2023

Verified

This commit was signed with the committer’s verified signature.
iGxnon iGxnon
commit 21eca5639e35200f784e2f6d66fdefd8039f896b
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions operator-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -16,3 +16,5 @@ async-trait = "0.1.72"
k8s-openapi = { version = "0.20.0", features = ["v1_28", "schemars"] }
kube = { version = "0.86.0", features = ["runtime", "derive", "ws"] }
serde = { version = "1.0.130", features = ["derive"] }
reqwest = { version = "0.11", features = ["json"] }
futures = "0.3.28"
4 changes: 4 additions & 0 deletions operator-api/src/consts.rs
Original file line number Diff line number Diff line change
@@ -6,3 +6,7 @@ pub const DEFAULT_DATA_DIR: &str = "/usr/local/xline/data-dir";
pub const OPERATOR_MONITOR_ROUTE: &str = "/monitor";
/// the URL ROUTE of each sidecar for backup
pub const SIDECAR_BACKUP_ROUTE: &str = "/backup";
/// the URL ROUTE of each sidecar member for health checking
pub const SIDECAR_HEALTH_ROUTE: &str = "/health";
/// the URL ROUTE of each sidecar member for getting states
pub const SIDECAR_STATE_ROUTE: &str = "/state";
88 changes: 77 additions & 11 deletions operator-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -4,11 +4,20 @@ pub mod consts;
/// Xline handle
mod xline;

use std::time::{SystemTime, UNIX_EPOCH};
pub use xline::{K8sXlineHandle, LocalXlineHandle, XlineHandle};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use reqwest::StatusCode;
use std::collections::HashMap;
use std::sync::OnceLock;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub use xline::*;

use serde::{Deserialize, Serialize};

/// Heartbeat http client
static HEARTBEAT_CLIENT: OnceLock<reqwest::Client> = OnceLock::new();

/// Heartbeat status, sort by timestamp.
/// The clock of each machine may be different, which may cause heartbeat to be unable to assist
/// the operator in detecting the dropped sidecar.
@@ -44,26 +53,83 @@ impl Ord for HeartbeatStatus {
}

impl HeartbeatStatus {
/// Create a new `HeartbeatStatus` with current timestamp
pub fn current(cluster_name: String, name: String, reachable: Vec<String>) -> Self {
let dur = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| unreachable!("time turns back!"));
const DEFAULT_HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(10);

/// Create a new `HeartbeatStatus`
pub fn new(cluster_name: String, name: String, timestamp: u64, reachable: Vec<String>) -> Self {
Self {
cluster_name,
name,
timestamp: dur.as_secs(),
timestamp,
reachable,
}
}

/// Create a new `HeartbeatStatus`
pub fn new(cluster_name: String, name: String, timestamp: u64, reachable: Vec<String>) -> Self {
/// Create a new `HeartbeatStatus` from gathered information
pub async fn gather(
cluster_name: String,
name: String,
sidecars: &HashMap<String, String>,
) -> Self {
use consts::SIDECAR_HEALTH_ROUTE;

let client = HEARTBEAT_CLIENT.get_or_init(|| {
reqwest::Client::builder()
.timeout(Self::DEFAULT_HEALTH_CHECK_TIMEOUT)
.build()
.unwrap_or_else(|err| unreachable!("http client build error {err}"))
});

let mut reachable: Vec<_> = sidecars
.iter()
.map(|(name, addr)| async move {
(
name,
client
.get(format!("http://{addr}{SIDECAR_HEALTH_ROUTE}"))
.send()
.await,
)
})
.collect::<FuturesUnordered<_>>()
.filter_map(|(name, resp)| async {
resp.is_ok_and(|r| r.status() == StatusCode::OK)
.then(|| name.clone())
})
.collect()
.await;

// make sure self name should be inside
if !reachable.contains(&name) {
reachable.push(name.clone());
}

let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| unreachable!("time turns back!"));
Self {
cluster_name,
name,
timestamp,
timestamp: ts.as_secs(),
reachable,
}
}

/// Report status to monitor
pub async fn report(&self, monitor_addr: &str) -> anyhow::Result<()> {
use consts::OPERATOR_MONITOR_ROUTE;

let url = format!("http://{monitor_addr}{OPERATOR_MONITOR_ROUTE}");

let client = HEARTBEAT_CLIENT.get_or_init(|| {
reqwest::Client::builder()
.timeout(Self::DEFAULT_HEALTH_CHECK_TIMEOUT)
.build()
.unwrap_or_else(|err| unreachable!("http client build error {err}"))
});

let _ig = client.post(url).json(self).send().await?;

Ok(())
}
}
72 changes: 56 additions & 16 deletions operator-api/src/xline.rs
Original file line number Diff line number Diff line change
@@ -2,15 +2,53 @@ use async_trait::async_trait;
use k8s_openapi::api::core::v1::Pod;
use kube::api::{AttachParams, AttachedProcess};
use kube::Api;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};

use std::process::{Child, Command};

#[derive(Debug, Clone)]
pub struct XlineConfig {
pub name: String,
pub executable: String,
pub storage_engine: String,
pub data_dir: String,
pub is_leader: bool,
pub additional: Option<String>,
}

impl XlineConfig {
fn gen_start_cmd(&self, members: &HashMap<String, String>) -> String {
let mut start_cmd = format!(
"{} --name {} --members {} --storage-engine {} --data-dir {}",
self.executable,
self.name,
members
.iter()
.map(|(name, addr)| format!("{name}={addr}"))
.collect::<Vec<_>>()
.join(","),
self.storage_engine,
self.data_dir,
);
if self.is_leader {
start_cmd.push(' ');
start_cmd.push_str("--is-leader");
}
if let Some(additional) = &self.additional {
start_cmd.push(' ');
let pat: &[_] = &['\'', '"'];
start_cmd.push_str(additional.trim_matches(pat));
}
start_cmd
}
}

/// xline handle abstraction
#[async_trait]
pub trait XlineHandle: Debug + Send + Sync + 'static {
/// start a xline node
async fn start(&mut self) -> anyhow::Result<()>; // we dont care about what failure happened when start, it just failed
async fn start(&mut self, members: &HashMap<String, String>) -> anyhow::Result<()>; // we dont care about what failure happened when start, it just failed

/// kill a xline node
async fn kill(&mut self) -> anyhow::Result<()>;
@@ -20,25 +58,26 @@ pub trait XlineHandle: Debug + Send + Sync + 'static {
/// machine with the start_cmd
#[derive(Debug)]
pub struct LocalXlineHandle {
start_cmd: String,
config: XlineConfig,
child_proc: Option<Child>,
}

impl LocalXlineHandle {
/// New a local xline handle
pub fn new(start_cmd: String) -> Self {
pub fn new(config: XlineConfig) -> Self {
Self {
start_cmd,
config,
child_proc: None,
}
}
}

#[async_trait]
impl XlineHandle for LocalXlineHandle {
async fn start(&mut self) -> anyhow::Result<()> {
async fn start(&mut self, members: &HashMap<String, String>) -> anyhow::Result<()> {
self.kill().await?;
let mut cmds = self.start_cmd.split_whitespace();
let cmd = self.config.gen_start_cmd(members);
let mut cmds = cmd.split_whitespace();
let Some((exe, args)) = cmds
.next()
.map(|exe| (exe, cmds.collect::<Vec<_>>())) else {
@@ -68,8 +107,8 @@ pub struct K8sXlineHandle {
pods_api: Api<Pod>,
/// the attached process of xline
process: Option<AttachedProcess>,
/// the xline start cmd, parameters are split by ' '
start_cmd: String,
/// the xline config
config: XlineConfig,
}

impl Debug for K8sXlineHandle {
@@ -78,7 +117,7 @@ impl Debug for K8sXlineHandle {
.field("pod_name", &self.pod_name)
.field("container_name", &self.container_name)
.field("pods_api", &self.pods_api)
.field("start_cmd", &self.start_cmd)
.field("config", &self.config)
.finish()
}
}
@@ -89,7 +128,7 @@ impl K8sXlineHandle {
pod_name: String,
container_name: String,
namespace: &str,
start_cmd: String,
config: XlineConfig,
) -> Self {
let client = kube::Client::try_default()
.await
@@ -99,7 +138,7 @@ impl K8sXlineHandle {
container_name,
pods_api: Api::namespaced(client, namespace),
process: None,
start_cmd,
config,
}
}

@@ -109,28 +148,29 @@ impl K8sXlineHandle {
container_name: String,
client: kube::Client,
namespace: &str,
start_cmd: String,
config: XlineConfig,
) -> Self {
Self {
pod_name,
container_name,
pods_api: Api::namespaced(client, namespace),
process: None,
start_cmd,
config,
}
}
}

#[async_trait]
impl XlineHandle for K8sXlineHandle {
async fn start(&mut self) -> anyhow::Result<()> {
async fn start(&mut self, members: &HashMap<String, String>) -> anyhow::Result<()> {
self.kill().await?;
let start_cmd: Vec<&str> = self.start_cmd.split_whitespace().collect();
let cmd = self.config.gen_start_cmd(members);
let cmds: Vec<&str> = cmd.split_whitespace().collect();
let process = self
.pods_api
.exec(
&self.pod_name,
start_cmd,
cmds,
&AttachParams::default().container(&self.container_name),
)
.await?;
6 changes: 3 additions & 3 deletions sidecar/src/lib.rs
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@
clippy::rc_mutex,
clippy::rest_pat_in_fully_bound_structs,
clippy::same_name_method,
clippy::self_named_module_files,
// clippy::self_named_module_files, false positive
// clippy::shadow_reuse, it’s a common pattern in Rust code
// clippy::shadow_same, it’s a common pattern in Rust code
clippy::shadow_unrelated,
@@ -152,10 +152,10 @@
mod backup;
/// Sidecar operator controller
mod controller;
/// Sidecar operator
pub mod operator;
/// The web server routers
mod routers;
/// Sidecar operator
pub mod sidecar;
/// Sidecar operator types
pub mod types;
/// Some utils
Loading
Loading