diff --git a/Cargo.lock b/Cargo.lock index dcc2fc051..b1e55a724 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2490,9 +2490,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.14" +version = "0.23.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" +checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" dependencies = [ "log", "once_cell", @@ -2514,9 +2514,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-webpki" diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 4c93decaf..7cb754135 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -163,7 +163,7 @@ impl Drop for ProposeIdGuard<'_> { #[async_trait] trait RepeatableClientApi: ClientApi { /// Generate a unique propose id during the retry process. - fn gen_propose_id(&self) -> Result, Self::Error>; + async fn gen_propose_id(&self) -> Result, Self::Error>; /// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered /// requests (event the requests are commutative). @@ -422,51 +422,23 @@ impl ClientBuilder { }) } - /// Wait for client id - async fn wait_for_client_id(&self, state: Arc) -> Result<(), tonic::Status> { - /// Max retry count for waiting for a client ID - /// - /// TODO: This retry count is set relatively high to avoid test cluster startup timeouts. - /// We should consider setting this to a more reasonable value. - const RETRY_COUNT: usize = 30; - /// The interval for each retry - const RETRY_INTERVAL: Duration = Duration::from_secs(1); - - for _ in 0..RETRY_COUNT { - if state.client_id() != 0 { - return Ok(()); - } - debug!("waiting for client_id"); - tokio::time::sleep(RETRY_INTERVAL).await; - } - - Err(tonic::Status::deadline_exceeded( - "timeout waiting for client id", - )) - } - /// Build the client /// /// # Errors /// /// Return `tonic::transport::Error` for connection failure. #[inline] - pub async fn build( + pub fn build( &self, ) -> Result + Send + Sync + 'static, tonic::Status> { - let state = Arc::new( - self.init_state_builder() - .build() - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?, - ); + let state = Arc::new(self.init_state_builder().build()); let client = Retry::new( Unary::new(Arc::clone(&state), self.init_unary_config()), self.init_retry_config(), Some(self.spawn_bg_tasks(Arc::clone(&state))), ); - self.wait_for_client_id(state).await?; + Ok(client) } @@ -477,21 +449,14 @@ impl ClientBuilder { /// /// Return `tonic::transport::Error` for connection failure. #[inline] - pub async fn build_with_client_id( + #[must_use] + pub fn build_with_client_id( &self, - ) -> Result< - ( - impl ClientApi + Send + Sync + 'static, - Arc, - ), - tonic::Status, - > { - let state = Arc::new( - self.init_state_builder() - .build() - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?, - ); + ) -> ( + impl ClientApi + Send + Sync + 'static, + Arc, + ) { + let state = Arc::new(self.init_state_builder().build()); let client = Retry::new( Unary::new(Arc::clone(&state), self.init_unary_config()), @@ -499,9 +464,8 @@ impl ClientBuilder { Some(self.spawn_bg_tasks(Arc::clone(&state))), ); let client_id = state.clone_client_id(); - self.wait_for_client_id(state).await?; - Ok((client, client_id)) + (client, client_id) } } @@ -512,22 +476,20 @@ impl ClientBuilderWithBypass

{ /// /// Return `tonic::transport::Error` for connection failure. #[inline] - pub async fn build( + pub fn build( self, ) -> Result, tonic::Status> { let state = self .inner .init_state_builder() - .build_bypassed::

(self.local_server_id, self.local_server) - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?; + .build_bypassed::

(self.local_server_id, self.local_server); let state = Arc::new(state); let client = Retry::new( Unary::new(Arc::clone(&state), self.inner.init_unary_config()), self.inner.init_retry_config(), Some(self.inner.spawn_bg_tasks(Arc::clone(&state))), ); - self.inner.wait_for_client_id(state).await?; + Ok(client) } } diff --git a/crates/curp/src/client/retry.rs b/crates/curp/src/client/retry.rs index 06e670a89..c67db6019 100644 --- a/crates/curp/src/client/retry.rs +++ b/crates/curp/src/client/retry.rs @@ -231,7 +231,7 @@ where use_fast_path: bool, ) -> Result, tonic::Status> { self.retry::<_, _>(|client| async move { - let propose_id = self.inner.gen_propose_id()?; + let propose_id = self.inner.gen_propose_id().await?; RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path).await }) .await @@ -245,7 +245,7 @@ where self.retry::<_, _>(|client| { let changes_c = changes.clone(); async move { - let propose_id = self.inner.gen_propose_id()?; + let propose_id = self.inner.gen_propose_id().await?; RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c).await } }) @@ -255,7 +255,7 @@ where /// Send propose to shutdown cluster async fn propose_shutdown(&self) -> Result<(), tonic::Status> { self.retry::<_, _>(|client| async move { - let propose_id = self.inner.gen_propose_id()?; + let propose_id = self.inner.gen_propose_id().await?; RepeatableClientApi::propose_shutdown(client, *propose_id).await }) .await @@ -272,7 +272,7 @@ where let name_c = node_name.clone(); let node_client_urls_c = node_client_urls.clone(); async move { - let propose_id = self.inner.gen_propose_id()?; + let propose_id = self.inner.gen_propose_id().await?; RepeatableClientApi::propose_publish( client, *propose_id, diff --git a/crates/curp/src/client/state.rs b/crates/curp/src/client/state.rs index 8476e46b8..8a9b53081 100644 --- a/crates/curp/src/client/state.rs +++ b/crates/curp/src/client/state.rs @@ -95,7 +95,8 @@ impl State { tls_config, is_raw_curp: true, }, - client_id: Arc::new(AtomicU64::new(0)), + // Sets the client id to non-zero to avoid waiting for client id in tests + client_id: Arc::new(AtomicU64::new(1)), }) } @@ -146,8 +147,8 @@ impl State { }; let resp = rand_conn .fetch_cluster(FetchClusterRequest::default(), REFRESH_TIMEOUT) - .await?; - self.check_and_update(&resp.into_inner()).await?; + .await; + self.check_and_update(&resp?.into_inner()).await?; Ok(()) } @@ -327,7 +328,7 @@ impl State { .remove(&diff) .unwrap_or_else(|| unreachable!("{diff} must in new member addrs")); debug!("client connects to a new server({diff}), address({addrs:?})"); - let new_conn = rpc::connect(diff, addrs, self.immutable.tls_config.clone()).await?; + let new_conn = rpc::connect(diff, addrs, self.immutable.tls_config.clone()); let _ig = e.insert(new_conn); } else { debug!("client removes old server({diff})"); @@ -347,6 +348,30 @@ impl State { Ok(()) } + + /// Wait for client id + pub(super) async fn wait_for_client_id(&self) -> Result { + /// Max retry count for waiting for a client ID + /// + /// TODO: This retry count is set relatively high to avoid test cluster startup timeouts. + /// We should consider setting this to a more reasonable value. + const RETRY_COUNT: usize = 30; + /// The interval for each retry + const RETRY_INTERVAL: Duration = Duration::from_secs(1); + + for _ in 0..RETRY_COUNT { + let client_id = self.client_id(); + if client_id != 0 { + return Ok(client_id); + } + debug!("waiting for client_id"); + tokio::time::sleep(RETRY_INTERVAL).await; + } + + Err(tonic::Status::deadline_exceeded( + "timeout waiting for client id", + )) + } } /// Builder for state @@ -395,24 +420,22 @@ impl StateBuilder { } /// Build the state with local server - pub(super) async fn build_bypassed( + pub(super) fn build_bypassed( mut self, local_server_id: ServerId, local_server: P, - ) -> Result { + ) -> State { debug!("client bypassed server({local_server_id})"); let _ig = self.all_members.remove(&local_server_id); let mut connects: HashMap<_, _> = - rpc::connects(self.all_members.clone(), self.tls_config.as_ref()) - .await? - .collect(); + rpc::connects(self.all_members.clone(), self.tls_config.as_ref()).collect(); let __ig = connects.insert( local_server_id, Arc::new(BypassedConnect::new(local_server_id, local_server)), ); - Ok(State { + State { mutable: RwLock::new(StateMut { leader: self.leader_state.map(|state| state.0), term: self.leader_state.map_or(0, |state| state.1), @@ -426,16 +449,14 @@ impl StateBuilder { is_raw_curp: self.is_raw_curp, }, client_id: Arc::new(AtomicU64::new(0)), - }) + } } /// Build the state - pub(super) async fn build(self) -> Result { + pub(super) fn build(self) -> State { let connects: HashMap<_, _> = - rpc::connects(self.all_members.clone(), self.tls_config.as_ref()) - .await? - .collect(); - Ok(State { + rpc::connects(self.all_members.clone(), self.tls_config.as_ref()).collect(); + State { mutable: RwLock::new(StateMut { leader: self.leader_state.map(|state| state.0), term: self.leader_state.map_or(0, |state| state.1), @@ -449,6 +470,6 @@ impl StateBuilder { is_raw_curp: self.is_raw_curp, }, client_id: Arc::new(AtomicU64::new(0)), - }) + } } } diff --git a/crates/curp/src/client/tests.rs b/crates/curp/src/client/tests.rs index e97fce4ce..39c8b88bc 100644 --- a/crates/curp/src/client/tests.rs +++ b/crates/curp/src/client/tests.rs @@ -751,7 +751,7 @@ async fn test_stream_client_keep_alive_works() { Box::pin(async move { client_id .compare_exchange( - 0, + 1, 10, std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed, @@ -775,7 +775,7 @@ async fn test_stream_client_keep_alive_on_redirect() { Box::pin(async move { client_id .compare_exchange( - 0, + 1, 10, std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed, diff --git a/crates/curp/src/client/unary/mod.rs b/crates/curp/src/client/unary/mod.rs index 8219ec04b..90986bdb7 100644 --- a/crates/curp/src/client/unary/mod.rs +++ b/crates/curp/src/client/unary/mod.rs @@ -134,7 +134,7 @@ impl ClientApi for Unary { token: Option<&String>, use_fast_path: bool, ) -> Result, CurpError> { - let propose_id = self.gen_propose_id()?; + let propose_id = self.gen_propose_id().await?; RepeatableClientApi::propose(self, *propose_id, cmd, token, use_fast_path).await } @@ -143,13 +143,13 @@ impl ClientApi for Unary { &self, changes: Vec, ) -> Result, CurpError> { - let propose_id = self.gen_propose_id()?; + let propose_id = self.gen_propose_id().await?; RepeatableClientApi::propose_conf_change(self, *propose_id, changes).await } /// Send propose to shutdown cluster async fn propose_shutdown(&self) -> Result<(), CurpError> { - let propose_id = self.gen_propose_id()?; + let propose_id = self.gen_propose_id().await?; RepeatableClientApi::propose_shutdown(self, *propose_id).await } @@ -160,7 +160,7 @@ impl ClientApi for Unary { node_name: String, node_client_urls: Vec, ) -> Result<(), Self::Error> { - let propose_id = self.gen_propose_id()?; + let propose_id = self.gen_propose_id().await?; RepeatableClientApi::propose_publish( self, *propose_id, @@ -306,8 +306,11 @@ impl ClientApi for Unary { #[async_trait] impl RepeatableClientApi for Unary { /// Generate a unique propose id during the retry process. - fn gen_propose_id(&self) -> Result, Self::Error> { - let client_id = self.state.client_id(); + async fn gen_propose_id(&self) -> Result, Self::Error> { + let mut client_id = self.state.client_id(); + if client_id == 0 { + client_id = self.state.wait_for_client_id().await?; + }; let seq_num = self.new_seq_num(); Ok(ProposeIdGuard::new( &self.tracker, diff --git a/crates/curp/src/members.rs b/crates/curp/src/members.rs index ce2045451..5682268f1 100644 --- a/crates/curp/src/members.rs +++ b/crates/curp/src/members.rs @@ -439,8 +439,6 @@ pub async fn get_cluster_info_from_remote( let peers = init_cluster_info.peers_addrs(); let self_client_urls = init_cluster_info.self_client_urls(); let connects = rpc::connects(peers, tls_config) - .await - .ok()? .map(|pair| pair.1) .collect_vec(); let mut futs = connects diff --git a/crates/curp/src/rpc/connect.rs b/crates/curp/src/rpc/connect.rs index d438b6c28..c62b37d31 100644 --- a/crates/curp/src/rpc/connect.rs +++ b/crates/curp/src/rpc/connect.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use bytes::BytesMut; use clippy_utilities::NumericCast; use engine::SnapshotApi; -use futures::{stream::FuturesUnordered, Stream}; +use futures::Stream; #[cfg(test)] use mockall::automock; use tokio::sync::Mutex; @@ -42,6 +42,7 @@ use crate::{ use super::{ proto::commandpb::{ReadIndexRequest, ReadIndexResponse}, + reconnect::Reconnect, OpResponse, RecordRequest, RecordResponse, }; @@ -69,85 +70,79 @@ impl FromTonicChannel for InnerProtocolClient { } } -/// Connect to a server -async fn connect_to( +/// Creates a new connection +fn connect_to( id: ServerId, addrs: Vec, tls_config: Option, -) -> Result>, tonic::transport::Error> { - let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE); +) -> Connect { + let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE.max(addrs.len())); for addr in &addrs { - let endpoint = build_endpoint(addr, tls_config.as_ref())?; - let _ig = change_tx - .send(tower::discover::Change::Insert(addr.clone(), endpoint)) - .await; + let endpoint = build_endpoint(addr, tls_config.as_ref()) + .unwrap_or_else(|_| unreachable!("address is ill-formatted")); + change_tx + .try_send(tower::discover::Change::Insert(addr.clone(), endpoint)) + .unwrap_or_else(|_| unreachable!("unknown channel tx send error")); } let client = Client::from_channel(channel); - let connect = Arc::new(Connect { + Connect { id, rpc_connect: client, change_tx, addrs: Mutex::new(addrs), tls_config, - }); - Ok(connect) + } } -/// Connect to a map of members -async fn connect_all( - members: HashMap>, - tls_config: Option<&ClientTlsConfig>, -) -> Result>)>, tonic::transport::Error> { - let conns_to: FuturesUnordered<_> = members - .into_iter() - .map(|(id, addrs)| async move { - connect_to::(id, addrs, tls_config.cloned()) - .await - .map(|conn| (id, conn)) - }) - .collect(); - futures::StreamExt::collect::>(conns_to) - .await - .into_iter() - .collect::, _>>() +/// Creates a new connection with auto reconnect +fn new_reconnect( + id: ServerId, + addrs: Vec, + tls_config: Option, +) -> Reconnect>> { + Reconnect::new(Box::new(move || { + connect_to(id, addrs.clone(), tls_config.clone()) + })) } /// A wrapper of [`connect_to`], hide the detailed [`Connect`] -pub(crate) async fn connect( +pub(crate) fn connect( id: ServerId, addrs: Vec, tls_config: Option, -) -> Result, tonic::transport::Error> { - let conn = connect_to::>(id, addrs, tls_config).await?; - Ok(conn) +) -> Arc { + let conn = new_reconnect(id, addrs, tls_config); + Arc::new(conn) } /// Wrapper of [`connect_all`], hide the details of [`Connect`] -pub(crate) async fn connects( +pub(crate) fn connects( members: HashMap>, tls_config: Option<&ClientTlsConfig>, -) -> Result)>, tonic::transport::Error> { - // It seems that casting high-rank types cannot be inferred, so we allow trivial_casts to cast manually - #[allow(trivial_casts)] - #[allow(clippy::as_conversions)] - let conns = connect_all(members, tls_config) - .await? +) -> impl Iterator)> { + let tls_config = tls_config.cloned(); + members .into_iter() - .map(|(id, conn)| (id, conn as Arc)); - Ok(conns) + .map(move |(id, addrs)| (id, connect(id, addrs, tls_config.clone()))) } /// Wrapper of [`connect_all`], hide the details of [`Connect`] -pub(crate) async fn inner_connects( +pub(crate) fn inner_connects( members: HashMap>, tls_config: Option<&ClientTlsConfig>, -) -> Result, tonic::transport::Error> { - let conns = connect_all(members, tls_config) - .await? - .into_iter() - .map(|(id, conn)| (id, InnerConnectApiWrapper::new_from_arc(conn))); - Ok(conns) +) -> impl Iterator { + let tls_config = tls_config.cloned(); + members.into_iter().map(move |(id, addrs)| { + ( + id, + InnerConnectApiWrapper::new_from_arc(Arc::new(connect_to::< + InnerProtocolClient, + >( + id, addrs, tls_config.clone() + ))), + ) + }) } /// Connect interface between server and clients @@ -282,13 +277,13 @@ impl InnerConnectApiWrapper { } /// Create a new `InnerConnectApiWrapper` from id and addrs - pub(crate) async fn connect( + pub(crate) fn connect( id: ServerId, addrs: Vec, tls_config: Option, - ) -> Result { - let conn = connect_to::>(id, addrs, tls_config).await?; - Ok(InnerConnectApiWrapper::new_from_arc(conn)) + ) -> Self { + let conn = connect_to::>(id, addrs, tls_config); + InnerConnectApiWrapper::new_from_arc(Arc::new(conn)) } } diff --git a/crates/curp/src/rpc/mod.rs b/crates/curp/src/rpc/mod.rs index c064c3bb0..10c56fa99 100644 --- a/crates/curp/src/rpc/mod.rs +++ b/crates/curp/src/rpc/mod.rs @@ -67,6 +67,9 @@ mod metrics; pub(crate) mod connect; pub(crate) use connect::{connect, connects, inner_connects}; +/// Auto reconnect connection +mod reconnect; + // Skip for generated code #[allow( clippy::all, diff --git a/crates/curp/src/rpc/reconnect.rs b/crates/curp/src/rpc/reconnect.rs new file mode 100644 index 000000000..e392db38a --- /dev/null +++ b/crates/curp/src/rpc/reconnect.rs @@ -0,0 +1,198 @@ +use std::{ + sync::{atomic::AtomicU64, Arc}, + time::Duration, +}; + +use async_trait::async_trait; +use event_listener::Event; +use futures::Stream; + +use crate::{ + members::ServerId, + rpc::{ + connect::ConnectApi, CurpError, FetchClusterRequest, FetchClusterResponse, + FetchReadStateRequest, FetchReadStateResponse, MoveLeaderRequest, MoveLeaderResponse, + OpResponse, ProposeConfChangeRequest, ProposeConfChangeResponse, ProposeRequest, + PublishRequest, PublishResponse, ReadIndexResponse, RecordRequest, RecordResponse, + ShutdownRequest, ShutdownResponse, + }, +}; + +/// Auto reconnect of a connection +pub(super) struct Reconnect { + /// Connect id + id: ServerId, + /// The connection + connect: tokio::sync::RwLock>, + /// The connect builder + builder: Box C + Send + Sync + 'static>, + /// Signal to abort heartbeat + event: Event, +} + +impl Reconnect { + /// Creates a new `Reconnect` + pub(crate) fn new(builder: Box C + Send + Sync + 'static>) -> Self { + let init_connect = builder(); + Self { + id: init_connect.id(), + connect: tokio::sync::RwLock::new(Some(init_connect)), + builder, + event: Event::new(), + } + } + + /// Creating a new connection to replace the current + async fn reconnect(&self) { + let new_connect = (self.builder)(); + // Cancel the leader keep alive loop task because it hold a read lock + let _cancel = self.event.notify(1); + let _ignore = self.connect.write().await.replace(new_connect); + // After connection is updated, notify to start the keep alive loop + let _continue = self.event.notify(1); + } + + /// Try to reconnect if the result is `Err` + async fn try_reconnect(&self, result: Result) -> Result { + // TODO: use `tonic::Status` instead of `CurpError`, we can't tell + // if a reconnect is required from `CurpError`. + if matches!( + result, + Err(CurpError::RpcTransport(()) | CurpError::Internal(_)) + ) { + tracing::info!("client reconnecting"); + self.reconnect().await; + } + result + } +} + +/// Execute with reconnect +macro_rules! execute_with_reconnect { + ($self:expr, $trait_method:path, $($arg:expr),*) => {{ + let result = { + let connect = $self.connect.read().await; + let connect_ref = connect.as_ref().unwrap(); + ($trait_method)(connect_ref, $($arg),*).await + }; + $self.try_reconnect(result).await + }}; +} + +#[allow(clippy::unwrap_used, clippy::unwrap_in_result)] +#[async_trait] +impl ConnectApi for Reconnect { + /// Get server id + fn id(&self) -> ServerId { + self.id + } + + /// Update server addresses, the new addresses will override the old ones + async fn update_addrs(&self, addrs: Vec) -> Result<(), tonic::transport::Error> { + let connect = self.connect.read().await; + connect.as_ref().unwrap().update_addrs(addrs).await + } + + /// Send `ProposeRequest` + async fn propose_stream( + &self, + request: ProposeRequest, + token: Option, + timeout: Duration, + ) -> Result< + tonic::Response> + Send>>, + CurpError, + > { + execute_with_reconnect!(self, ConnectApi::propose_stream, request, token, timeout) + } + + /// Send `RecordRequest` + async fn record( + &self, + request: RecordRequest, + timeout: Duration, + ) -> Result, CurpError> { + execute_with_reconnect!(self, ConnectApi::record, request, timeout) + } + + /// Send `ReadIndexRequest` + async fn read_index( + &self, + timeout: Duration, + ) -> Result, CurpError> { + execute_with_reconnect!(self, ConnectApi::read_index, timeout) + } + + /// Send `ProposeRequest` + async fn propose_conf_change( + &self, + request: ProposeConfChangeRequest, + timeout: Duration, + ) -> Result, CurpError> { + execute_with_reconnect!(self, ConnectApi::propose_conf_change, request, timeout) + } + + /// Send `PublishRequest` + async fn publish( + &self, + request: PublishRequest, + timeout: Duration, + ) -> Result, CurpError> { + execute_with_reconnect!(self, ConnectApi::publish, request, timeout) + } + + /// Send `ShutdownRequest` + async fn shutdown( + &self, + request: ShutdownRequest, + timeout: Duration, + ) -> Result, CurpError> { + execute_with_reconnect!(self, ConnectApi::shutdown, request, timeout) + } + + /// Send `FetchClusterRequest` + async fn fetch_cluster( + &self, + request: FetchClusterRequest, + timeout: Duration, + ) -> Result, CurpError> { + execute_with_reconnect!(self, ConnectApi::fetch_cluster, request, timeout) + } + + /// Send `FetchReadStateRequest` + async fn fetch_read_state( + &self, + request: FetchReadStateRequest, + timeout: Duration, + ) -> Result, CurpError> { + execute_with_reconnect!(self, ConnectApi::fetch_read_state, request, timeout) + } + + /// Send `MoveLeaderRequest` + async fn move_leader( + &self, + request: MoveLeaderRequest, + timeout: Duration, + ) -> Result, CurpError> { + execute_with_reconnect!(self, ConnectApi::move_leader, request, timeout) + } + + /// Keep send lease keep alive to server and mutate the client id + async fn lease_keep_alive(&self, client_id: Arc, interval: Duration) -> CurpError { + loop { + let connect = self.connect.read().await; + let connect_ref = connect.as_ref().unwrap(); + tokio::select! { + err = connect_ref.lease_keep_alive(Arc::clone(&client_id), interval) => { + return err; + } + _empty = self.event.listen() => {}, + } + // Creates the listener before dropping the read lock. + // This prevents us from losting the event. + let listener = self.event.listen(); + drop(connect); + let _connection_updated = listener.await; + } + } +} diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index f2ce80c16..18ff5b4e7 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -690,19 +690,11 @@ impl, RC: RoleChange> CurpNode { }; match change.change_type() { ConfChangeType::Add | ConfChangeType::AddLearner => { - let connect = match InnerConnectApiWrapper::connect( + let connect = InnerConnectApiWrapper::connect( change.node_id, change.address, curp.client_tls_config().cloned(), - ) - .await - { - Ok(connect) => connect, - Err(e) => { - error!("connect to {} failed, {}", change.node_id, e); - continue; - } - }; + ); curp.insert_connect(connect.clone()); let sync_event = curp.sync_event(change.node_id); let remove_event = Arc::new(Event::new()); @@ -842,7 +834,8 @@ impl, RC: RoleChange> CurpNode { /// Create a new server instance #[inline] #[allow(clippy::too_many_arguments)] // TODO: refactor this use builder pattern - pub(super) async fn new( + #[allow(clippy::needless_pass_by_value)] // The value should be consumed + pub(super) fn new( cluster_info: Arc, is_leader: bool, cmd_executor: Arc, @@ -860,10 +853,8 @@ impl, RC: RoleChange> CurpNode { .into_iter() .map(|server_id| (server_id, Arc::new(Event::new()))) .collect(); - let connects = rpc::inner_connects(cluster_info.peers_addrs(), client_tls_config.as_ref()) - .await - .map_err(|e| CurpError::internal(format!("parse peers addresses failed, err {e:?}")))? - .collect(); + let connects = + rpc::inner_connects(cluster_info.peers_addrs(), client_tls_config.as_ref()).collect(); let cmd_board = Arc::new(RwLock::new(CommandBoard::new())); let lease_manager = Arc::new(RwLock::new(LeaseManager::new())); let last_applied = cmd_executor diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index 2a10c8925..b4e012f62 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -263,7 +263,7 @@ impl, RC: RoleChange> Rpc { /// Panic if storage creation failed #[inline] #[allow(clippy::too_many_arguments)] // TODO: refactor this use builder pattern - pub async fn new( + pub fn new( cluster_info: Arc, is_leader: bool, executor: Arc, @@ -289,9 +289,7 @@ impl, RC: RoleChange> Rpc { client_tls_config, sps, ucps, - ) - .await - { + ) { Ok(n) => n, Err(err) => { panic!("failed to create curp service, {err:?}"); @@ -345,8 +343,7 @@ impl, RC: RoleChange> Rpc { client_tls_config, sps, ucps, - ) - .await; + ); tonic::transport::Server::builder() .add_service(ProtocolServer::new(server.clone())) diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index e7ee921ec..45a511735 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -136,22 +136,19 @@ impl CurpGroup { let role_change_cb = TestRoleChange::default(); let role_change_arc = role_change_cb.get_inner_arc(); let curp_storage = Arc::new(DB::open(&config.engine_cfg).unwrap()); - let server = Arc::new( - Rpc::new( - cluster_info, - name == leader_name, - ce, - snapshot_allocator, - role_change_cb, - config, - curp_storage, - Arc::clone(&task_manager), - client_tls_config.clone(), - vec![Box::::default()], - vec![Box::::default()], - ) - .await, - ); + let server = Arc::new(Rpc::new( + cluster_info, + name == leader_name, + ce, + snapshot_allocator, + role_change_cb, + config, + curp_storage, + Arc::clone(&task_manager), + client_tls_config.clone(), + vec![Box::::default()], + vec![Box::::default()], + )); task_manager.spawn(TaskName::TonicServer, |n| async move { let ig = Self::run(server, listener, n).await; }); @@ -268,22 +265,19 @@ impl CurpGroup { let role_change_cb = TestRoleChange::default(); let role_change_arc = role_change_cb.get_inner_arc(); let curp_storage = Arc::new(DB::open(&config.engine_cfg).unwrap()); - let server = Arc::new( - Rpc::new( - cluster_info, - false, - ce, - snapshot_allocator, - role_change_cb, - config, - curp_storage, - Arc::clone(&task_manager), - self.client_tls_config.clone(), - vec![], - vec![], - ) - .await, - ); + let server = Arc::new(Rpc::new( + cluster_info, + false, + ce, + snapshot_allocator, + role_change_cb, + config, + curp_storage, + Arc::clone(&task_manager), + self.client_tls_config.clone(), + vec![], + vec![], + )); task_manager.spawn(TaskName::TonicServer, |n| async move { let _ig = Self::run(server, listener, n).await; }); @@ -329,7 +323,6 @@ impl CurpGroup { .await .unwrap() .build() - .await .unwrap() } diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index 4d00cc1ce..bbb869186 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -455,7 +455,6 @@ async fn shutdown_rpc_should_shutdown_the_cluster_when_client_has_wrong_leader() .leader_state(follower_id, 0) .all_members(group.all_addrs_map()) .build::() - .await .unwrap(); client.propose_shutdown().await.unwrap(); @@ -477,7 +476,6 @@ async fn propose_conf_change_to_follower() { .leader_state(follower_id, 0) .all_members(group.all_addrs_map()) .build::() - .await .unwrap(); let node_id = group.nodes.keys().next().copied().unwrap(); diff --git a/crates/simulation/src/curp_group.rs b/crates/simulation/src/curp_group.rs index 1d87524f3..8c8b8dff8 100644 --- a/crates/simulation/src/curp_group.rs +++ b/crates/simulation/src/curp_group.rs @@ -195,10 +195,8 @@ impl CurpGroup { ClientBuilder::new(config, true) .all_members(all_members) .build_with_client_id() - .await }) .await - .unwrap() .unwrap(); SimClient { inner: Arc::new(client), diff --git a/crates/simulation/tests/it/curp/server_recovery.rs b/crates/simulation/tests/it/curp/server_recovery.rs index 7e8a88ccf..3e8c85125 100644 --- a/crates/simulation/tests/it/curp/server_recovery.rs +++ b/crates/simulation/tests/it/curp/server_recovery.rs @@ -457,6 +457,13 @@ async fn recovery_after_compaction() { async fn overwritten_config_should_fallback() { init_logger(); let group = CurpGroup::new(5).await; + let client = group.new_client().await; + // A workaround for dedup. The client will lazily acquire an id from the leader during a + // propose. + let _wait_for_client_id = client + .propose(TestCommand::new_put(vec![0], 0), false) + .await; + let client_id = client.client_id(); let leader1 = group.get_leader().await.0; for node in group.nodes.values().filter(|node| node.id != leader1) { group.disable_node(node.id); @@ -468,13 +475,13 @@ async fn overwritten_config_should_fallback() { let node_id = 123; let address = vec!["127.0.0.1:4567".to_owned()]; let changes = vec![ConfChange::add(node_id, address)]; - let client = group.new_client().await; let res = leader_conn .propose_conf_change( ProposeConfChangeRequest { propose_id: Some(PbProposeId { - client_id: client.client_id(), - seq_num: 0, + client_id, + // start from 1 as we already propose an put with seq_num = 0 + seq_num: 1, }), changes, cluster_version: cluster.cluster_version, diff --git a/crates/xline-client/src/lib.rs b/crates/xline-client/src/lib.rs index 77e42cb78..fc8d1f1da 100644 --- a/crates/xline-client/src/lib.rs +++ b/crates/xline-client/src/lib.rs @@ -244,8 +244,7 @@ impl Client { .tls_config(options.tls_config) .discover_from(addrs) .await? - .build::() - .await?, + .build::()?, ) as Arc; let id_gen = Arc::new(lease_gen::LeaseIdGenerator::new()); diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index b940c8489..20761c575 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -518,8 +518,7 @@ impl XlineServer { self.client_tls_config.clone(), XlineSpeculativePools::new(Arc::clone(&lease_collection)).into_inner(), XlineUncommittedPools::new(lease_collection).into_inner(), - ) - .await; + ); let client = Arc::new( CurpClientBuilder::new(*self.cluster_config.client_config(), false) @@ -527,8 +526,7 @@ impl XlineServer { .cluster_version(self.cluster_info.cluster_version()) .all_members(self.cluster_info.all_members_peer_urls()) .bypass(self.cluster_info.self_id(), curp_server.clone()) - .build::() - .await?, + .build::()?, ) as Arc; if let Some(compactor) = auto_compactor_c { diff --git a/crates/xline/tests/it/lock_test.rs b/crates/xline/tests/it/lock_test.rs index d89231f03..29dc9a19b 100644 --- a/crates/xline/tests/it/lock_test.rs +++ b/crates/xline/tests/it/lock_test.rs @@ -1,4 +1,4 @@ -use std::{error::Error, time::Duration}; +use std::{error::Error, sync::Arc, time::Duration}; use test_macros::abort_on_panic; use tokio::time::{sleep, Instant}; @@ -11,17 +11,20 @@ async fn test_lock() -> Result<(), Box> { cluster.start().await; let client = cluster.client().await; let lock_client = client.lock_client(); + let event = Arc::new(event_listener::Event::new()); let lock_handle = tokio::spawn({ let c = lock_client.clone(); + let event = Arc::clone(&event); async move { let mut xutex = Xutex::new(c, "test", None, None).await.unwrap(); let _lock = xutex.lock_unsafe().await.unwrap(); - sleep(Duration::from_secs(3)).await; + let _notified = event.notify(1); + sleep(Duration::from_secs(2)).await; } }); - sleep(Duration::from_secs(1)).await; + event.listen().await; let now = Instant::now(); let mut xutex = Xutex::new(lock_client, "test", None, None).await?;