From 4e525c060678ea25a603b363272fd6013993109b Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:51:19 +0800 Subject: [PATCH] feat: reimplement fetch cluster Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> chore: move fetch_impl to upper level chore: move config.rs to upper level Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> refactor: client fetch Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/client/cluster_state.rs | 18 ++- crates/curp/src/client/{unary => }/config.rs | 0 crates/curp/src/client/fetch.rs | 121 +++++++++++++++++++ crates/curp/src/client/mod.rs | 8 ++ crates/curp/src/client/unary/mod.rs | 19 ++- 5 files changed, 159 insertions(+), 7 deletions(-) rename crates/curp/src/client/{unary => }/config.rs (100%) create mode 100644 crates/curp/src/client/fetch.rs diff --git a/crates/curp/src/client/cluster_state.rs b/crates/curp/src/client/cluster_state.rs index 8d5818fa9..cdfe72b47 100644 --- a/crates/curp/src/client/cluster_state.rs +++ b/crates/curp/src/client/cluster_state.rs @@ -10,7 +10,8 @@ use crate::{ /// The cluster state /// /// The client must discover the cluster info before sending any propose -struct ClusterState { +#[derive(Default, Clone)] +pub(crate) struct ClusterState { /// Leader id. leader: ServerId, /// Term, initialize to 0, calibrated by the server. @@ -33,6 +34,21 @@ impl std::fmt::Debug for ClusterState { } impl ClusterState { + /// Creates a new `ClusterState` + pub(crate) fn new( + leader: ServerId, + term: u64, + cluster_version: u64, + connects: HashMap>, + ) -> Self { + Self { + leader, + term, + cluster_version, + connects, + } + } + /// Take an async function and map to the dedicated server, return None /// if the server can not found in local state pub(crate) fn map_server>>( diff --git a/crates/curp/src/client/unary/config.rs b/crates/curp/src/client/config.rs similarity index 100% rename from crates/curp/src/client/unary/config.rs rename to crates/curp/src/client/config.rs diff --git a/crates/curp/src/client/fetch.rs b/crates/curp/src/client/fetch.rs new file mode 100644 index 000000000..1b7f4e187 --- /dev/null +++ b/crates/curp/src/client/fetch.rs @@ -0,0 +1,121 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use curp_external_api::cmd::Command; +use futures::{future, FutureExt, StreamExt}; +use parking_lot::RwLock; +use tonic::Response; +use tracing::warn; +use utils::parking_lot_lock::RwLockMap; + +use crate::{ + quorum, + rpc::{self, connect::ConnectApi, CurpError, FetchClusterRequest, FetchClusterResponse}, +}; + +use super::cluster_state::ClusterState; +use super::config::Config; + +/// Fetch cluster implementation +struct Fetch { + /// The fetch config + config: Config, +} + +impl Fetch { + /// Creates a new `Fetch` + pub(crate) fn new(config: Config) -> Self { + Self { config } + } + + /// Fetch cluster and updates the current state + pub(crate) async fn fetch_cluster( + &self, + state: ClusterState, + ) -> Result { + /// Retry interval + const FETCH_RETRY_INTERVAL: Duration = Duration::from_secs(1); + loop { + let resp = self + .pre_fetch(&state) + .await + .ok_or(CurpError::internal("cluster not available"))?; + let new_members = self.member_addrs(&resp); + let new_connects = self.connect_to(new_members); + let new_state = ClusterState::new( + resp.leader_id + .unwrap_or_else(|| unreachable!("leader id should be Some")) + .into(), + resp.term, + resp.cluster_version, + new_connects, + ); + if self.fetch_term(&new_state).await { + return Ok(new_state); + } + warn!("Fetch cluster failed, sleep for {FETCH_RETRY_INTERVAL:?}"); + tokio::time::sleep(FETCH_RETRY_INTERVAL).await; + } + } + + /// Fetch the term of the cluster. This ensures that the current leader is the latest. + async fn fetch_term(&self, state: &ClusterState) -> bool { + let timeout = self.config.wait_synced_timeout(); + let term = state.term(); + let quorum = state.get_quorum(quorum); + state + .for_each_server(|c| async move { + c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout) + .await + }) + .filter_map(|r| future::ready(r.ok())) + .map(Response::into_inner) + .filter(move |resp| future::ready(resp.term == term)) + .take(quorum) + .count() + .map(move |t| t >= quorum) + .await + } + + /// Prefetch, send fetch cluster request to the cluster and get the + /// config with the greatest quorum. + async fn pre_fetch(&self, state: &ClusterState) -> Option { + let timeout = self.config.wait_synced_timeout(); + let requests = state.for_each_server(|c| async move { + c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout) + .await + }); + let responses: Vec<_> = requests + .filter_map(|r| future::ready(r.ok())) + .map(Response::into_inner) + .collect() + .await; + responses + .into_iter() + .filter(|resp| resp.leader_id.is_some()) + .filter(|resp| !resp.members.is_empty()) + .max_by(|x, y| x.term.cmp(&y.term)) + } + + /// Gets the member addresses to connect to + fn member_addrs(&self, resp: &FetchClusterResponse) -> HashMap> { + if self.config.is_raw_curp() { + resp.clone().into_peer_urls() + } else { + resp.clone().into_client_urls() + } + } + + /// Connect to the given addrs + fn connect_to( + &self, + new_members: HashMap>, + ) -> HashMap> { + new_members + .into_iter() + .map(|(id, addrs)| { + let tls_config = self.config.tls_config().cloned(); + (id, rpc::connect(id, addrs, tls_config)) + }) + .collect() + } +} diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 8e0d0f440..f759f18b9 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -21,6 +21,14 @@ mod state; /// State of the cluster mod cluster_state; +#[allow(unused)] +/// Client cluster fetch implementation +mod fetch; + +#[allow(unused)] +/// Config of the client +mod config; + /// Tests for client #[cfg(test)] mod tests; diff --git a/crates/curp/src/client/unary/mod.rs b/crates/curp/src/client/unary/mod.rs index 93563e072..41f510e80 100644 --- a/crates/curp/src/client/unary/mod.rs +++ b/crates/curp/src/client/unary/mod.rs @@ -1,10 +1,6 @@ /// Client propose implementation mod propose_impl; -#[allow(unused)] -/// Config of the client -mod config; - use std::{ cmp::Ordering, marker::PhantomData, @@ -20,8 +16,8 @@ use tonic::Response; use tracing::{debug, warn}; use super::{ - state::State, ClientApi, LeaderStateUpdate, ProposeIdGuard, ProposeResponse, - RepeatableClientApi, + cluster_state::ClusterState, config::Config, state::State, ClientApi, LeaderStateUpdate, + ProposeIdGuard, ProposeResponse, RepeatableClientApi, }; use crate::{ members::ServerId, @@ -68,6 +64,13 @@ pub(super) struct Unary { last_sent_seq: AtomicU64, /// marker phantom: PhantomData, + + #[allow(dead_code)] + /// Cluster state + cluster_state: RwLock, + #[allow(dead_code)] + /// Cluster state + client_config: Config, } impl Unary { @@ -79,6 +82,10 @@ impl Unary { tracker: RwLock::new(Tracker::default()), last_sent_seq: AtomicU64::new(0), phantom: PhantomData, + + // TODO: build cluster state + cluster_state: RwLock::default(), + client_config: Config::default(), } }