Skip to content

Commit

Permalink
feat: implement xline handle
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 23, 2023
1 parent 9c6d855 commit d94d973
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 60 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions operator-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ async-trait = "0.1.72"
k8s-openapi = { version = "0.18.0", features = ["v1_26", "schemars"] }
kube = { version = "0.83.0", features = ["runtime", "derive", "ws"] }
serde = { version = "1.0.130", features = ["derive"] }
anyhow = "1.0.72"
2 changes: 2 additions & 0 deletions operator-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ mod xline;

pub mod consts;

pub use xline::{K8sXlineHandle, LocalXlineHandle, XlineHandle};

use serde::{Deserialize, Serialize};

/// Heartbeat status
Expand Down
118 changes: 104 additions & 14 deletions operator-api/src/xline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,58 @@ use async_trait::async_trait;
use k8s_openapi::api::core::v1::Pod;
use kube::api::{AttachParams, AttachedProcess};
use kube::Api;
use std::fmt::{Debug, Formatter};

use std::process::{Child, Command};

/// xline handle abstraction
#[async_trait]
pub trait XlineHandle {
/// the err during start and kill
type Err;

pub trait XlineHandle: Debug + Send + Sync + 'static {
/// start a xline node
async fn start(&mut self) -> Result<(), Self::Err>;
async fn start(&mut self) -> anyhow::Result<()>; // we truly dont care about what failure happened when start, it just failed

/// kill a xline node
async fn kill(&mut self) -> Result<(), Self::Err>;
async fn kill(&mut self) -> anyhow::Result<()>;
}

/// Local xline handle, it will execute the xline in the local
/// machine with the start_cmd
#[derive(Debug)]
pub struct LocalXlineHandle {
start_cmd: String,
child_proc: Option<Child>,
}

impl LocalXlineHandle {
/// New a local xline handle
pub fn new(start_cmd: String) -> Self {
Self {
start_cmd,
child_proc: None,
}
}
}

#[async_trait]
impl XlineHandle for LocalXlineHandle {
async fn start(&mut self) -> anyhow::Result<()> {
if let Some((exe, args)) = self.start_cmd.split_once(' ') {
let proc = Command::new(exe).args(args.split(' ')).spawn()?;
self.child_proc = Some(proc);
}
unreachable!("the start_cmd must be valid");
}

async fn kill(&mut self) -> anyhow::Result<()> {
if let Some(mut proc) = self.child_proc.take() {
return Ok(proc.kill()?);
}
Ok(())
}
}

/// K8s xline handle
/// K8s xline handle, it will execute the xline start_cmd
/// in pod
pub struct K8sXlineHandle {
/// the pod name
pod_name: String,
Expand All @@ -26,26 +63,79 @@ pub struct K8sXlineHandle {
pods_api: Api<Pod>,
/// the attached process of xline
process: Option<AttachedProcess>,
/// the xline start cmd, parameters are split by ' '
start_cmd: String,
}

impl Debug for K8sXlineHandle {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("K8sXlineHandle")
.field("pod_name", &self.pod_name)
.field("container_name", &self.container_name)
.field("pods_api", &self.pods_api)
.field("start_cmd", &self.start_cmd)
.finish()
}
}

impl K8sXlineHandle {
/// New with default k8s client
pub async fn new_with_default(
pod_name: String,
container_name: String,
namespace: &str,
start_cmd: String,
) -> Self {
let client = kube::Client::try_default()
.await
.unwrap_or_else(|_ig| unreachable!("it must be setup in k8s environment"));
Self {
pod_name,
container_name,
pods_api: Api::namespaced(client, namespace),
process: None,
start_cmd,
}
}

/// New with the provided k8s client
pub fn new_with_client(
pod_name: String,
container_name: String,
client: kube::Client,
namespace: &str,
start_cmd: String,
) -> Self {
Self {
pod_name,
container_name,
pods_api: Api::namespaced(client, namespace),
process: None,
start_cmd,
}
}
}

#[async_trait]
impl XlineHandle for K8sXlineHandle {
type Err = kube::Error;

async fn start(&mut self) -> Result<(), Self::Err> {
async fn start(&mut self) -> anyhow::Result<()> {
let start_cmd: Vec<&str> = self.start_cmd.split(' ').collect();
let process = self
.pods_api
.exec(
&self.pod_name,
vec!["sh"],
start_cmd,
&AttachParams::default().container(&self.container_name),
)
.await?;
self.process = Some(process);
todo!()
Ok(())
}

async fn kill(&mut self) -> Result<(), Self::Err> {
todo!()
async fn kill(&mut self) -> anyhow::Result<()> {
if let Some(process) = self.process.take() {
process.abort();
}
Ok(())
}
}
1 change: 1 addition & 0 deletions sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.4.1", features = ["v4"] }
xline-client = { git = "https://github.com/xline-kv/Xline.git", package = "xline-client" }
xlineapi = { git = "https://github.com/xline-kv/Xline.git", package = "xlineapi" }
operator-api = { path = "../operator-api" }
Loading

0 comments on commit d94d973

Please sign in to comment.