diff --git a/curp/src/members.rs b/curp/src/members.rs index 42f1a4244..4fbb2a072 100644 --- a/curp/src/members.rs +++ b/curp/src/members.rs @@ -6,6 +6,8 @@ use std::{ use dashmap::{mapref::one::Ref, DashMap}; use itertools::Itertools; +use crate::rpc::PbMember; + /// Server Id pub type ServerId = u64; @@ -22,6 +24,17 @@ pub struct Member { is_learner: bool, } +impl From for PbMember { + fn from(member: Member) -> Self { + Self { + id: member.id, + name: member.name, + addrs: member.addrs, + is_learner: member.is_learner, + } + } +} + /// Cluster member impl Member { /// Create a new `Member` @@ -143,10 +156,10 @@ impl ClusterInfo { .addrs = addrs.into(); } - /// Get server address via server id + /// Get server addresses via server id #[must_use] #[inline] - pub fn address(&self, id: ServerId) -> Option> { + pub fn addrs(&self, id: ServerId) -> Option> { self.members .iter() .find(|t| t.id == id) @@ -162,7 +175,7 @@ impl ClusterInfo { /// Get the current server address #[must_use] #[inline] - pub fn self_address(&self) -> Vec { + pub fn self_addrs(&self) -> Vec { self.self_member().addrs.clone() } @@ -198,6 +211,7 @@ impl ClusterInfo { timestamp: Option, ) -> ServerId { let mut hasher = DefaultHasher::new(); + // to make sure same addrs but different order will get same id addrs.sort(); for addr in addrs { hasher.write(addr.as_bytes()); @@ -306,7 +320,7 @@ mod tests { let node1 = ClusterInfo::new(all_members, "S1"); let peers = node1.peers_addrs(); let node1_id = node1.self_id(); - let node1_url = node1.self_address(); + let node1_url = node1.self_addrs(); assert!(!peers.contains_key(&node1_id)); assert_eq!(peers.len(), 2); assert_eq!(node1.members_len(), peers.len() + 1); diff --git a/curp/src/rpc/connect.rs b/curp/src/rpc/connect.rs index 745dfbc55..099c8d4eb 100644 --- a/curp/src/rpc/connect.rs +++ b/curp/src/rpc/connect.rs @@ -41,68 +41,51 @@ const SNAPSHOT_CHUNK_SIZE: u64 = 64 * 1024; /// The default buffer size for rpc connection const DEFAULT_BUFFER_SIZE: usize = 1024; +/// Connect implementation +macro_rules! connect_impl { + ($client:ty, $api:path, $members:ident) => { + futures::future::join_all($members.into_iter().map(|(id, mut addrs)| async move { + let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE); + // Addrs must start with "http" to communicate with the server + for addr in &mut addrs { + if !addr.starts_with("http://") { + addr.insert_str(0, "http://"); + } + let endpoint = Endpoint::from_shared(addr.clone())?; + let _ig = change_tx + .send(tower::discover::Change::Insert(addr.clone(), endpoint)) + .await; + } + let client = <$client>::new(channel); + let connect: Arc = Arc::new(Connect { + id, + rpc_connect: client, + change_tx, + addrs: Mutex::new(addrs), + }); + Ok((id, connect)) + })) + .await + .into_iter() + .collect::, tonic::transport::Error>>() + .map(IntoIterator::into_iter) + }; +} + /// Convert a vec of addr string to a vec of `Connect` /// # Errors /// Return error if any of the address format is invalid pub(crate) async fn connect( members: HashMap>, ) -> Result)>, tonic::transport::Error> { - futures::future::join_all(members.into_iter().map(|(id, mut addrs)| async move { - let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE); - // Addrs must start with "http" to communicate with the server - for addr in &mut addrs { - if !addr.starts_with("http://") { - addr.insert_str(0, "http://"); - } - let endpoint = Endpoint::from_shared(addr.clone())?; - let _ig = change_tx - .send(tower::discover::Change::Insert(addr.clone(), endpoint)) - .await; - } - let client = ProtocolClient::new(channel); - let connect: Arc = Arc::new(Connect { - id, - rpc_connect: client, - change_tx, - addrs: Mutex::new(addrs), - }); - Ok((id, connect)) - })) - .await - .into_iter() - .collect::, tonic::transport::Error>>() - .map(IntoIterator::into_iter) + connect_impl!(ProtocolClient, ConnectApi, members) } /// Convert a vec of addr string to a vec of `InnerConnect` pub(crate) async fn inner_connect( members: HashMap>, ) -> Result)>, tonic::transport::Error> { - futures::future::join_all(members.into_iter().map(|(id, mut addrs)| async move { - let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE); - // Addrs must start with "http" to communicate with the server - for addr in &mut addrs { - if !addr.starts_with("http://") { - addr.insert_str(0, "http://"); - } - let endpoint = Endpoint::from_shared(addr.clone())?; - let _ig = change_tx - .send(tower::discover::Change::Insert(addr.clone(), endpoint)) - .await; - } - let client = InnerProtocolClient::new(channel); - let connect: Arc = Arc::new(Connect { - id, - rpc_connect: client, - change_tx, - addrs: Mutex::new(addrs), - }); - Ok((id, connect)) - })) - .await - .into_iter() - .collect::, tonic::transport::Error>>() - .map(IntoIterator::into_iter) + connect_impl!(InnerProtocolClient, InnerConnectApi, members) } /// Connect interface between server and clients diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index 25dc46ef3..91279a1d1 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -79,17 +79,6 @@ mod proto { } } -impl From for PbMember { - fn from(member: Member) -> Self { - Self { - id: member.id(), - name: member.name().to_owned(), - addrs: member.addrs().to_vec(), - is_learner: member.is_learner(), - } - } -} - impl FetchLeaderRequest { /// Create a new `FetchLeaderRequest` pub(crate) fn new() -> Self { diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index 723614d39..028d41bbf 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -795,7 +795,7 @@ fn update_node_should_update_the_address_of_node() { }; let follower_id = curp.cluster().get_id_by_name("S1").unwrap(); assert_eq!( - curp.cluster().address(follower_id), + curp.cluster().addrs(follower_id), Some(vec!["S1".to_owned()]) ); let changes = vec![ConfChange::update( @@ -805,7 +805,7 @@ fn update_node_should_update_the_address_of_node() { let resp = curp.apply_conf_change(changes); assert!(resp.is_ok()); assert_eq!( - curp.cluster().address(follower_id), + curp.cluster().addrs(follower_id), Some(vec!["http://127.0.0.1:4567".to_owned()]) ); } diff --git a/xline/src/main.rs b/xline/src/main.rs index c0bd9cb4a..8e02ef8f1 100644 --- a/xline/src/main.rs +++ b/xline/src/main.rs @@ -502,7 +502,7 @@ async fn main() -> Result<()> { ) .await; - let self_addr_strings = cluster_config + let server_addr_str = cluster_config .members() .get(cluster_config.name()) .ok_or_else(|| { @@ -511,7 +511,7 @@ async fn main() -> Result<()> { cluster_config.name() ) })?; - let self_addrs = self_addr_strings + let server_addr = server_addr_str .iter() .map(|addr| { addr.to_socket_addrs()? @@ -521,7 +521,7 @@ async fn main() -> Result<()> { .collect::, _>>()?; debug!("name = {:?}", cluster_config.name()); - debug!("server_addr = {self_addrs:?}"); + debug!("server_addr = {server_addr:?}"); debug!("cluster_peers = {:?}", cluster_config.members()); let name = cluster_config.name().clone(); @@ -539,7 +539,7 @@ async fn main() -> Result<()> { *config.compact(), ); debug!("{:?}", server); - server.start(self_addrs, db_proxy, key_pair).await?; + server.start(server_addr, db_proxy, key_pair).await?; tokio::signal::ctrl_c().await?; info!("received ctrl-c, shutting down, press ctrl-c again to force exit"); diff --git a/xline/src/server/lease_server.rs b/xline/src/server/lease_server.rs index 206c2f915..89248f529 100644 --- a/xline/src/server/lease_server.rs +++ b/xline/src/server/lease_server.rs @@ -322,7 +322,7 @@ where // a follower when it lost the election. Therefore we need to double check here. // We can directly invoke leader_keep_alive when a candidate becomes a leader. if !self.lease_storage.is_primary() { - let leader_addr = self.cluster_info.address(leader_id).unwrap_or_else(|| { + let leader_addrs = self.cluster_info.addrs(leader_id).unwrap_or_else(|| { unreachable!( "The address of leader {} not found in all_members {:?}", leader_id, self.cluster_info @@ -331,7 +331,7 @@ where break self .follower_keep_alive( request_stream, - leader_addr, + leader_addrs, self.shutdown_listener.clone(), ) .await?; @@ -374,14 +374,14 @@ where .get_leader_id_from_curp() .await .map_err(propose_err_to_status)?; - let leader_addr = self.cluster_info.address(leader_id).unwrap_or_else(|| { + let leader_addrs = self.cluster_info.addrs(leader_id).unwrap_or_else(|| { unreachable!( "The address of leader {} not found in all_members {:?}", leader_id, self.cluster_info ) }); if !self.lease_storage.is_primary() { - let endpoints = build_endpoints(leader_addr)?; + let endpoints = build_endpoints(leader_addrs)?; let channel = tonic::transport::Channel::balance_list(endpoints.into_iter()); let mut lease_client = LeaseClient::new(channel); return lease_client.lease_time_to_live(request).await; diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 3783c7678..4a2f6b329 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -385,7 +385,7 @@ impl XlineServer { Arc::clone(&client), Arc::clone(&id_gen), self.cluster_info.self_name(), - self.cluster_info.self_address(), + self.cluster_info.self_addrs(), ), LeaseServer::new( lease_storage,