-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: reimplement curp client keep alive #1006
Open
bsbds
wants to merge
10
commits into
xline-kv:refactor-curp-client
Choose a base branch
from
bsbds:impl-curp-client-keep-alive
base: refactor-curp-client
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
da43347
refactor: reimplement curp client state
bsbds ecbc804
refactor: add new to `Config`
bsbds e5e56db
feat: implement map functions for `ClusterState`
bsbds e0d8800
feat: implement `get_quorum` method for `ClusterState`
bsbds a75a12b
chore: rename state.rs to cluster_state.rs
bsbds df5f715
chore: move cluster_state to upper level
bsbds 7fc23b4
chore: remove mutable methods from ClusterState
bsbds 05504ae
fix: curp client config
bsbds 4e525c0
feat: reimplement fetch cluster
bsbds b216c9c
refactor: curp client lease keep alive
bsbds File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
use std::{collections::HashMap, sync::Arc}; | ||
|
||
use futures::{stream::FuturesUnordered, Future}; | ||
|
||
use crate::{ | ||
members::ServerId, | ||
rpc::{connect::ConnectApi, CurpError}, | ||
}; | ||
|
||
/// The cluster state | ||
/// | ||
/// The client must discover the cluster info before sending any propose | ||
#[derive(Default, Clone)] | ||
pub(crate) struct ClusterState { | ||
/// Leader id. | ||
leader: ServerId, | ||
/// Term, initialize to 0, calibrated by the server. | ||
term: u64, | ||
/// Cluster version, initialize to 0, calibrated by the server. | ||
cluster_version: u64, | ||
/// Members' connect, calibrated by the server. | ||
connects: HashMap<ServerId, Arc<dyn ConnectApi>>, | ||
} | ||
|
||
impl std::fmt::Debug for ClusterState { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("State") | ||
.field("leader", &self.leader) | ||
.field("term", &self.term) | ||
.field("cluster_version", &self.cluster_version) | ||
.field("connects", &self.connects.keys()) | ||
.finish() | ||
} | ||
} | ||
|
||
impl ClusterState { | ||
/// Creates a new `ClusterState` | ||
pub(crate) fn new( | ||
leader: ServerId, | ||
term: u64, | ||
cluster_version: u64, | ||
connects: HashMap<ServerId, Arc<dyn ConnectApi>>, | ||
) -> Self { | ||
Self { | ||
leader, | ||
term, | ||
cluster_version, | ||
connects, | ||
} | ||
} | ||
|
||
/// Take an async function and map to the dedicated server, return None | ||
/// if the server can not found in local state | ||
pub(crate) fn map_server<R, F: Future<Output = Result<R, CurpError>>>( | ||
&self, | ||
id: ServerId, | ||
f: impl FnOnce(Arc<dyn ConnectApi>) -> F, | ||
) -> Option<F> { | ||
// If the leader id cannot be found in connects, it indicates that there is | ||
// an inconsistency between the client's local leader state and the cluster | ||
// state, then mock a `WrongClusterVersion` return to the outside. | ||
self.connects.get(&id).map(Arc::clone).map(f) | ||
} | ||
|
||
/// Take an async function and map to the dedicated server, return None | ||
/// if the server can not found in local state | ||
pub(crate) fn map_leader<R, F: Future<Output = Result<R, CurpError>>>( | ||
&self, | ||
f: impl FnOnce(Arc<dyn ConnectApi>) -> F, | ||
) -> F { | ||
// If the leader id cannot be found in connects, it indicates that there is | ||
// an inconsistency between the client's local leader state and the cluster | ||
// state, then mock a `WrongClusterVersion` return to the outside. | ||
f(Arc::clone(self.connects.get(&self.leader).unwrap_or_else( | ||
|| unreachable!("leader connect should always exists"), | ||
))) | ||
} | ||
|
||
/// Take an async function and map to all server, returning `FuturesUnordered<F>` | ||
pub(crate) fn for_each_server<R, F: Future<Output = R>>( | ||
&self, | ||
f: impl FnMut(Arc<dyn ConnectApi>) -> F, | ||
) -> FuturesUnordered<F> { | ||
self.connects.values().map(Arc::clone).map(f).collect() | ||
} | ||
|
||
/// Take an async function and map to all server, returning `FuturesUnordered<F>` | ||
pub(crate) fn for_each_follower<R, F: Future<Output = R>>( | ||
&self, | ||
f: impl FnMut(Arc<dyn ConnectApi>) -> F, | ||
) -> FuturesUnordered<F> { | ||
self.connects | ||
.iter() | ||
.filter_map(|(id, conn)| (*id != self.leader).then_some(conn)) | ||
.map(Arc::clone) | ||
.map(f) | ||
.collect() | ||
} | ||
|
||
/// Returns the quorum size based on the given quorum function | ||
/// | ||
/// NOTE: Do not update the cluster in between an `for_each_xxx` and an `get_quorum`, which may | ||
/// lead to inconsistent quorum. | ||
pub(crate) fn get_quorum<Q: FnMut(usize) -> usize>(&self, mut quorum: Q) -> usize { | ||
let cluster_size = self.connects.len(); | ||
quorum(cluster_size) | ||
} | ||
|
||
/// Returns the term of the cluster | ||
pub(crate) fn term(&self) -> u64 { | ||
self.term | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
use std::time::Duration; | ||
|
||
use tonic::transport::ClientTlsConfig; | ||
|
||
use crate::members::ServerId; | ||
|
||
/// Client config | ||
#[derive(Default, Debug, Clone)] | ||
pub(crate) struct Config { | ||
/// Local server id, should be initialized on startup | ||
local_server: Option<ServerId>, | ||
/// Client tls config | ||
tls_config: Option<ClientTlsConfig>, | ||
/// The rpc timeout of a propose request | ||
propose_timeout: Duration, | ||
/// The rpc timeout of a 2-RTT request, usually takes longer than propose timeout | ||
/// | ||
/// The recommended the values is within (propose_timeout, 2 * propose_timeout]. | ||
wait_synced_timeout: Duration, | ||
/// is current client send request to raw curp server | ||
is_raw_curp: bool, | ||
} | ||
|
||
impl Config { | ||
/// Creates a new `Config` | ||
pub(crate) fn new( | ||
local_server: Option<ServerId>, | ||
tls_config: Option<ClientTlsConfig>, | ||
propose_timeout: Duration, | ||
wait_synced_timeout: Duration, | ||
is_raw_curp: bool, | ||
) -> Self { | ||
Self { | ||
local_server, | ||
tls_config, | ||
propose_timeout, | ||
wait_synced_timeout, | ||
is_raw_curp, | ||
} | ||
} | ||
|
||
/// Get the local server id | ||
pub(crate) fn local_server(&self) -> Option<ServerId> { | ||
self.local_server | ||
} | ||
|
||
/// Get the client TLS config | ||
pub(crate) fn tls_config(&self) -> Option<&ClientTlsConfig> { | ||
self.tls_config.as_ref() | ||
} | ||
|
||
/// Get the propose timeout | ||
pub(crate) fn propose_timeout(&self) -> Duration { | ||
self.propose_timeout | ||
} | ||
|
||
/// Get the wait synced timeout | ||
pub(crate) fn wait_synced_timeout(&self) -> Duration { | ||
self.wait_synced_timeout | ||
} | ||
|
||
/// Returns `true` if the current client is on the server | ||
pub(crate) fn is_raw_curp(&self) -> bool { | ||
self.is_raw_curp | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
use std::{collections::HashMap, sync::Arc, time::Duration}; | ||
|
||
use curp_external_api::cmd::Command; | ||
use futures::{future, FutureExt, StreamExt}; | ||
use parking_lot::RwLock; | ||
use tonic::Response; | ||
use tracing::warn; | ||
use utils::parking_lot_lock::RwLockMap; | ||
|
||
use crate::{ | ||
quorum, | ||
rpc::{self, connect::ConnectApi, CurpError, FetchClusterRequest, FetchClusterResponse}, | ||
}; | ||
|
||
use super::cluster_state::ClusterState; | ||
use super::config::Config; | ||
|
||
/// Fetch cluster implementation | ||
#[derive(Debug, Default, Clone)] | ||
pub(crate) struct Fetch { | ||
/// The fetch config | ||
config: Config, | ||
} | ||
|
||
impl Fetch { | ||
/// Creates a new `Fetch` | ||
pub(crate) fn new(config: Config) -> Self { | ||
Self { config } | ||
} | ||
|
||
/// Fetch cluster and updates the current state | ||
pub(crate) async fn fetch_cluster( | ||
&self, | ||
state: ClusterState, | ||
) -> Result<ClusterState, CurpError> { | ||
/// Retry interval | ||
const FETCH_RETRY_INTERVAL: Duration = Duration::from_secs(1); | ||
loop { | ||
let resp = self | ||
.pre_fetch(&state) | ||
.await | ||
.ok_or(CurpError::internal("cluster not available"))?; | ||
let new_members = self.member_addrs(&resp); | ||
let new_connects = self.connect_to(new_members); | ||
let new_state = ClusterState::new( | ||
resp.leader_id | ||
.unwrap_or_else(|| unreachable!("leader id should be Some")) | ||
.into(), | ||
resp.term, | ||
resp.cluster_version, | ||
new_connects, | ||
); | ||
if self.fetch_term(&new_state).await { | ||
return Ok(new_state); | ||
} | ||
warn!("Fetch cluster failed, sleep for {FETCH_RETRY_INTERVAL:?}"); | ||
tokio::time::sleep(FETCH_RETRY_INTERVAL).await; | ||
} | ||
} | ||
|
||
/// Fetch the term of the cluster. This ensures that the current leader is the latest. | ||
async fn fetch_term(&self, state: &ClusterState) -> bool { | ||
let timeout = self.config.wait_synced_timeout(); | ||
let term = state.term(); | ||
let quorum = state.get_quorum(quorum); | ||
state | ||
.for_each_server(|c| async move { | ||
c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout) | ||
.await | ||
}) | ||
.filter_map(|r| future::ready(r.ok())) | ||
.map(Response::into_inner) | ||
.filter(move |resp| future::ready(resp.term == term)) | ||
.take(quorum) | ||
.count() | ||
.map(move |t| t >= quorum) | ||
.await | ||
} | ||
|
||
/// Prefetch, send fetch cluster request to the cluster and get the | ||
/// config with the greatest quorum. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean term number? |
||
async fn pre_fetch(&self, state: &ClusterState) -> Option<FetchClusterResponse> { | ||
let timeout = self.config.wait_synced_timeout(); | ||
let requests = state.for_each_server(|c| async move { | ||
c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout) | ||
.await | ||
}); | ||
let responses: Vec<_> = requests | ||
.filter_map(|r| future::ready(r.ok())) | ||
.map(Response::into_inner) | ||
.collect() | ||
.await; | ||
responses | ||
.into_iter() | ||
.filter(|resp| resp.leader_id.is_some()) | ||
.filter(|resp| !resp.members.is_empty()) | ||
.max_by(|x, y| x.term.cmp(&y.term)) | ||
} | ||
|
||
/// Gets the member addresses to connect to | ||
fn member_addrs(&self, resp: &FetchClusterResponse) -> HashMap<u64, Vec<String>> { | ||
if self.config.is_raw_curp() { | ||
resp.clone().into_peer_urls() | ||
} else { | ||
resp.clone().into_client_urls() | ||
} | ||
} | ||
|
||
/// Connect to the given addrs | ||
fn connect_to( | ||
&self, | ||
new_members: HashMap<u64, Vec<String>>, | ||
) -> HashMap<u64, Arc<dyn ConnectApi>> { | ||
new_members | ||
.into_iter() | ||
.map(|(id, addrs)| { | ||
let tls_config = self.config.tls_config().cloned(); | ||
(id, rpc::connect(id, addrs, tls_config)) | ||
}) | ||
.collect() | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is not correct