Skip to content

Commit

Permalink
feat: reimplement fetch cluster
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>

chore: move fetch_impl to upper level

chore: move config.rs to upper level

Signed-off-by: bsbds <[email protected]>

refactor: client fetch

Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Dec 3, 2024
1 parent 05504ae commit 4e525c0
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 7 deletions.
18 changes: 17 additions & 1 deletion crates/curp/src/client/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use crate::{
/// The cluster state
///
/// The client must discover the cluster info before sending any propose
struct ClusterState {
#[derive(Default, Clone)]
pub(crate) struct ClusterState {
/// Leader id.
leader: ServerId,
/// Term, initialize to 0, calibrated by the server.
Expand All @@ -33,6 +34,21 @@ impl std::fmt::Debug for ClusterState {
}

impl ClusterState {
/// Creates a new `ClusterState`
pub(crate) fn new(
leader: ServerId,
term: u64,
cluster_version: u64,
connects: HashMap<ServerId, Arc<dyn ConnectApi>>,
) -> Self {
Self {
leader,
term,
cluster_version,
connects,
}
}

/// Take an async function and map to the dedicated server, return None
/// if the server can not found in local state
pub(crate) fn map_server<R, F: Future<Output = Result<R, CurpError>>>(
Expand Down
File renamed without changes.
121 changes: 121 additions & 0 deletions crates/curp/src/client/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use curp_external_api::cmd::Command;
use futures::{future, FutureExt, StreamExt};
use parking_lot::RwLock;
use tonic::Response;
use tracing::warn;
use utils::parking_lot_lock::RwLockMap;

use crate::{
quorum,
rpc::{self, connect::ConnectApi, CurpError, FetchClusterRequest, FetchClusterResponse},
};

use super::cluster_state::ClusterState;
use super::config::Config;

/// Fetch cluster implementation
struct Fetch {
/// The fetch config
config: Config,
}

impl Fetch {
/// Creates a new `Fetch`
pub(crate) fn new(config: Config) -> Self {
Self { config }
}

/// Fetch cluster and updates the current state
pub(crate) async fn fetch_cluster(
&self,
state: ClusterState,
) -> Result<ClusterState, CurpError> {
/// Retry interval
const FETCH_RETRY_INTERVAL: Duration = Duration::from_secs(1);
loop {
let resp = self
.pre_fetch(&state)
.await
.ok_or(CurpError::internal("cluster not available"))?;
let new_members = self.member_addrs(&resp);
let new_connects = self.connect_to(new_members);
let new_state = ClusterState::new(
resp.leader_id
.unwrap_or_else(|| unreachable!("leader id should be Some"))
.into(),
resp.term,
resp.cluster_version,
new_connects,
);
if self.fetch_term(&new_state).await {
return Ok(new_state);
}
warn!("Fetch cluster failed, sleep for {FETCH_RETRY_INTERVAL:?}");
tokio::time::sleep(FETCH_RETRY_INTERVAL).await;
}
}

/// Fetch the term of the cluster. This ensures that the current leader is the latest.
async fn fetch_term(&self, state: &ClusterState) -> bool {
let timeout = self.config.wait_synced_timeout();
let term = state.term();
let quorum = state.get_quorum(quorum);
state
.for_each_server(|c| async move {
c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout)
.await
})
.filter_map(|r| future::ready(r.ok()))
.map(Response::into_inner)
.filter(move |resp| future::ready(resp.term == term))
.take(quorum)
.count()
.map(move |t| t >= quorum)
.await
}

/// Prefetch, send fetch cluster request to the cluster and get the
/// config with the greatest quorum.
async fn pre_fetch(&self, state: &ClusterState) -> Option<FetchClusterResponse> {
let timeout = self.config.wait_synced_timeout();
let requests = state.for_each_server(|c| async move {
c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout)
.await
});
let responses: Vec<_> = requests
.filter_map(|r| future::ready(r.ok()))
.map(Response::into_inner)
.collect()
.await;
responses
.into_iter()
.filter(|resp| resp.leader_id.is_some())
.filter(|resp| !resp.members.is_empty())
.max_by(|x, y| x.term.cmp(&y.term))
}

/// Gets the member addresses to connect to
fn member_addrs(&self, resp: &FetchClusterResponse) -> HashMap<u64, Vec<String>> {
if self.config.is_raw_curp() {
resp.clone().into_peer_urls()
} else {
resp.clone().into_client_urls()
}
}

/// Connect to the given addrs
fn connect_to(
&self,
new_members: HashMap<u64, Vec<String>>,
) -> HashMap<u64, Arc<dyn ConnectApi>> {
new_members
.into_iter()
.map(|(id, addrs)| {
let tls_config = self.config.tls_config().cloned();
(id, rpc::connect(id, addrs, tls_config))
})
.collect()
}
}
8 changes: 8 additions & 0 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ mod state;
/// State of the cluster
mod cluster_state;

#[allow(unused)]
/// Client cluster fetch implementation
mod fetch;

#[allow(unused)]
/// Config of the client
mod config;

/// Tests for client
#[cfg(test)]
mod tests;
Expand Down
19 changes: 13 additions & 6 deletions crates/curp/src/client/unary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
/// Client propose implementation
mod propose_impl;

#[allow(unused)]
/// Config of the client
mod config;

use std::{
cmp::Ordering,
marker::PhantomData,
Expand All @@ -20,8 +16,8 @@ use tonic::Response;
use tracing::{debug, warn};

use super::{
state::State, ClientApi, LeaderStateUpdate, ProposeIdGuard, ProposeResponse,
RepeatableClientApi,
cluster_state::ClusterState, config::Config, state::State, ClientApi, LeaderStateUpdate,
ProposeIdGuard, ProposeResponse, RepeatableClientApi,
};
use crate::{
members::ServerId,
Expand Down Expand Up @@ -68,6 +64,13 @@ pub(super) struct Unary<C: Command> {
last_sent_seq: AtomicU64,
/// marker
phantom: PhantomData<C>,

#[allow(dead_code)]
/// Cluster state
cluster_state: RwLock<ClusterState>,
#[allow(dead_code)]
/// Cluster state
client_config: Config,
}

impl<C: Command> Unary<C> {
Expand All @@ -79,6 +82,10 @@ impl<C: Command> Unary<C> {
tracker: RwLock::new(Tracker::default()),
last_sent_seq: AtomicU64::new(0),
phantom: PhantomData,

// TODO: build cluster state
cluster_state: RwLock::default(),
client_config: Config::default(),
}
}

Expand Down

0 comments on commit 4e525c0

Please sign in to comment.