Skip to content

Commit

Permalink
feat: implement membership change
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 23, 2023
1 parent 3af8b28 commit 14fe2a7
Show file tree
Hide file tree
Showing 8 changed files with 571 additions and 455 deletions.
954 changes: 517 additions & 437 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion operator-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ categories = ["API"]
keywords = ["operator", "API", "operator"]

[dependencies]
anyhow = "1.0.72"
async-trait = "0.1.72"
k8s-openapi = { version = "0.18.0", features = ["v1_26", "schemars"] }
kube = { version = "0.83.0", features = ["runtime", "derive", "ws"] }
serde = { version = "1.0.130", features = ["derive"] }
anyhow = "1.0.72"
2 changes: 2 additions & 0 deletions operator-api/src/xline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl LocalXlineHandle {
#[async_trait]
impl XlineHandle for LocalXlineHandle {
async fn start(&mut self) -> anyhow::Result<()> {
self.kill().await?;
let mut cmds = self.start_cmd.split_whitespace();
let Some((exe, args)) = cmds
.next()
Expand Down Expand Up @@ -123,6 +124,7 @@ impl K8sXlineHandle {
#[async_trait]
impl XlineHandle for K8sXlineHandle {
async fn start(&mut self) -> anyhow::Result<()> {
self.kill().await?;
let start_cmd: Vec<&str> = self.start_cmd.split_whitespace().collect();
let process = self
.pods_api
Expand Down
1 change: 0 additions & 1 deletion operator-k8s/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ futures = "0.3.28"
k8s-openapi = { version = "0.18.0", features = ["v1_26", "schemars"] }
kube = { version = "0.83.0", features = ["runtime", "derive"] }
operator-api = { path = "../operator-api" }
lazy_static = "1.4.0"
prometheus = "0.13.3"
schemars = "0.8.6"
serde = { version = "1.0.130", features = ["derive"] }
Expand Down
2 changes: 0 additions & 2 deletions sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,3 @@ tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.4.1", features = ["v4"] }
xline-client = { git = "https://github.com/xline-kv/Xline.git", package = "xline-client" }
xlineapi = { git = "https://github.com/xline-kv/Xline.git", package = "xlineapi" }
operator-api = { path = "../operator-api" }
2 changes: 1 addition & 1 deletion sidecar/src/backup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::Duration;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use tonic::Streaming;
use xlineapi::SnapshotResponse;
use xline_client::types::maintenance::SnapshotResponse;

/// Snapshot file suffix
const SNAPSHOT_SUFFIX: &str = "xline.backup";
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/backup/pv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::io;
use tokio::io::AsyncWriteExt;
use tonic::Streaming;
use tracing::debug;
use xlineapi::SnapshotResponse;
use xline_client::types::maintenance::SnapshotResponse;

use crate::backup::{Metadata, Provider, SNAPSHOT_SUFFIX};

Expand Down
61 changes: 49 additions & 12 deletions sidecar/src/xline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tonic_health::pb::health_check_response::ServingStatus;
use tonic_health::pb::health_client::HealthClient;
use tonic_health::pb::HealthCheckRequest;
use tracing::debug;
use xline_client::types::cluster::{MemberAddRequest, MemberListRequest, MemberRemoveRequest};
use xline_client::types::kv::RangeRequest;
use xline_client::{Client, ClientOptions};

Expand Down Expand Up @@ -53,10 +54,12 @@ pub(crate) struct XlineHandle {
name: String,
/// The xline backup provider
backup: Option<Box<dyn Provider>>,
/// The xline client
/// The xline client, used to connect to the cluster
client: Option<Client>,
/// The xline health client
/// The xline health client, used to check self health
health_client: HealthClient<Channel>,
/// The self xline server id
server_id: Option<u64>,
/// The rocks db engine
engine: Engine,
/// The xline members
Expand Down Expand Up @@ -87,6 +90,7 @@ impl XlineHandle {
health_client,
engine,
client: None,
server_id: None,
xline_members,
is_healthy_retries: 5,
inner,
Expand All @@ -102,6 +106,8 @@ impl XlineHandle {

/// Start the xline server
pub(crate) async fn start(&mut self) -> Result<()> {
// TODO: hold a distributed lock during start

// Step 1: Check if there is any node running
// Step 2: If there is no node running, start single node cluster
// Step 3: If there are some nodes running, start the node as a member to join the cluster
Expand All @@ -121,24 +127,55 @@ impl XlineHandle {
self.inner.start().await?;

let client = Client::connect(self.xline_members.values(), ClientOptions::default()).await?;
if cluster_started {
let _cluster_client = client.cluster_client();
// send membership change here
}
let _ig = self.client.replace(client);
let mut cluster_client = client.cluster_client();
let member = if cluster_started {
let peer_addr = self
.xline_members
.get(&self.name)
.unwrap_or_else(|| unreachable!("member should contain self"))
.clone();
let resp = cluster_client
.member_add(MemberAddRequest::new(vec![peer_addr], false))
.await?;
let Some(member) = resp.member else {
unreachable!("self member should be set when member add request success")
};
member
} else {
let mut members = cluster_client
.member_list(MemberListRequest::new(false))
.await?
.members;
if members.len() != 1 {
return Err(anyhow!(
"there should be only one member(self) if the cluster if not start"
));
}
members.remove(0)
};
debug!("xline server started, member: {:?}", member);
_ = self.server_id.replace(member.id);
_ = self.client.replace(client);
Ok(())
}

/// Stop the xline server
pub(crate) async fn stop(&mut self) -> Result<()> {
// Step 1: Kill the xline node
// Step 2: Remove the xline node from the cluster if the cluster exist
self.inner.kill().await?;
// Step 1: Remove the xline node from the cluster if the cluster exist
// Step 2: Kill the xline node
let server_id = self
.server_id
.take()
.ok_or_else(|| anyhow!("xline server should not be stopped before started"))?;

if self.is_healthy().await {
let _cluster_client = self.client().cluster_client();
// send membership change here
let mut cluster_client = self.client().cluster_client();
_ = cluster_client
.member_remove(MemberRemoveRequest::new(server_id))
.await?;
}

self.inner.kill().await?;
Ok(())
}

Expand Down

0 comments on commit 14fe2a7

Please sign in to comment.