Skip to content

Commit

Permalink
refactor: curp client lease keep alive
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Dec 3, 2024
1 parent 4e525c0 commit b216c9c
Show file tree
Hide file tree
Showing 7 changed files with 525 additions and 367 deletions.
3 changes: 2 additions & 1 deletion crates/curp/src/client/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use super::cluster_state::ClusterState;
use super::config::Config;

/// Fetch cluster implementation
struct Fetch {
#[derive(Debug, Default, Clone)]
pub(crate) struct Fetch {
/// The fetch config
config: Config,
}
Expand Down
104 changes: 104 additions & 0 deletions crates/curp/src/client/keep_alive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use event_listener::Event;
use futures::Future;
use parking_lot::RwLock;
use tokio::{sync::broadcast, task::JoinHandle};
use tracing::{debug, info, warn};

use super::{cluster_state::ClusterState, state::State};
use crate::rpc::{connect::ConnectApi, CurpError, Redirect};

/// Keep alive
#[derive(Clone, Debug)]
pub(crate) struct KeepAlive {
/// Heartbeat interval
heartbeat_interval: Duration,
}

/// Handle of the keep alive task
#[derive(Debug)]
pub(crate) struct KeepAliveHandle {
/// Client id
client_id: Arc<AtomicU64>,
/// Update event of client id
update_event: Arc<Event>,
/// Task join handle
handle: JoinHandle<()>,
}

impl KeepAliveHandle {
/// Wait for the client id
pub(crate) async fn wait_id_update(&self, current_id: u64) -> u64 {
loop {
let id = self.client_id.load(Ordering::Relaxed);
if current_id != id {
return id;
}
self.update_event.listen().await;
}
}
}

impl KeepAlive {
/// Creates a new `KeepAlive`
pub(crate) fn new(heartbeat_interval: Duration) -> Self {
Self { heartbeat_interval }
}

/// Streaming keep alive
pub(crate) fn spawn_keep_alive(
self,
cluster_state: Arc<RwLock<ClusterState>>,
) -> KeepAliveHandle {
/// Sleep duration when keep alive failed
const FAIL_SLEEP_DURATION: Duration = Duration::from_secs(1);
let client_id = Arc::new(AtomicU64::new(0));
let client_id_c = Arc::clone(&client_id);
let update_event = Arc::new(Event::new());
let update_event_c = Arc::clone(&update_event);
let handle = tokio::spawn(async move {
loop {
let current_state = cluster_state.read().clone();
let current_id = client_id.load(Ordering::Relaxed);
match self.keep_alive_with(current_id, current_state).await {
Ok(new_id) => {
client_id.store(new_id, Ordering::Relaxed);
let _ignore = update_event.notify(usize::MAX);
}
Err(e) => {
warn!("keep alive failed: {e:?}");
// Sleep for some time, the cluster state should be updated in a while
tokio::time::sleep(FAIL_SLEEP_DURATION).await;
}
}
}
});

KeepAliveHandle {
client_id: client_id_c,
update_event: update_event_c,
handle,
}
}

/// Keep alive with the given state and config
pub(crate) async fn keep_alive_with(
&self,
client_id: u64,
cluster_state: ClusterState,
) -> Result<u64, CurpError> {
cluster_state
.map_leader(|conn| async move {
conn.lease_keep_alive(client_id, self.heartbeat_interval)
.await
})
.await
}
}
32 changes: 19 additions & 13 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ mod metrics;
/// Unary rpc client
mod unary;

#[cfg(ignore)]
/// Stream rpc client
mod stream;

#[allow(unused)]
/// Retry layer
mod retry;

#[allow(unused)]
/// State for clients
mod state;

Expand All @@ -29,6 +32,10 @@ mod fetch;
/// Config of the client
mod config;

#[allow(unused)]
/// Lease keep alive implementation
mod keep_alive;

/// Tests for client
#[cfg(test)]
mod tests;
Expand All @@ -41,7 +48,6 @@ use async_trait::async_trait;
use curp_external_api::cmd::Command;
use futures::{stream::FuturesUnordered, StreamExt};
use parking_lot::RwLock;
use tokio::task::JoinHandle;
#[cfg(not(madsim))]
use tonic::transport::ClientTlsConfig;
use tracing::{debug, warn};
Expand All @@ -50,6 +56,8 @@ use utils::ClientTlsConfig;
use utils::{build_endpoint, config::ClientConfig};

use self::{
fetch::Fetch,
keep_alive::KeepAlive,
retry::{Retry, RetryConfig},
state::StateBuilder,
unary::{Unary, UnaryConfig},
Expand Down Expand Up @@ -424,16 +432,6 @@ impl ClientBuilder {
)
}

/// Spawn background tasks for the client
fn spawn_bg_tasks(&self, state: Arc<state::State>) -> JoinHandle<()> {
let interval = *self.config.keep_alive_interval();
tokio::spawn(async move {
let stream = stream::Streaming::new(state, stream::StreamingConfig::new(interval));
stream.keep_heartbeat().await;
debug!("keep heartbeat task shutdown");
})
}

/// Build the client
///
/// # Errors
Expand All @@ -445,10 +443,14 @@ impl ClientBuilder {
) -> Result<impl ClientApi<Error = tonic::Status, Cmd = C> + Send + Sync + 'static, tonic::Status>
{
let state = Arc::new(self.init_state_builder().build());
let keep_alive = KeepAlive::new(*self.config.keep_alive_interval());
// TODO: build the fetch object
let fetch = Fetch::default();
let client = Retry::new(
Unary::new(Arc::clone(&state), self.init_unary_config()),
self.init_retry_config(),
Some(self.spawn_bg_tasks(Arc::clone(&state))),
keep_alive,
fetch,
);

Ok(client)
Expand Down Expand Up @@ -496,10 +498,14 @@ impl<P: Protocol> ClientBuilderWithBypass<P> {
.init_state_builder()
.build_bypassed::<P>(self.local_server_id, self.local_server);
let state = Arc::new(state);
let keep_alive = KeepAlive::new(*self.inner.config.keep_alive_interval());
// TODO: build the fetch object
let fetch = Fetch::default();
let client = Retry::new(
Unary::new(Arc::clone(&state), self.inner.init_unary_config()),
self.inner.init_retry_config(),
Some(self.inner.spawn_bg_tasks(Arc::clone(&state))),
keep_alive,
fetch,
);

Ok(client)
Expand Down
Loading

0 comments on commit b216c9c

Please sign in to comment.