Skip to content

Commit

Permalink
chore: naming stuff
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Sep 26, 2023
1 parent c070a29 commit 283d804
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 76 deletions.
22 changes: 18 additions & 4 deletions curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{
use dashmap::{mapref::one::Ref, DashMap};
use itertools::Itertools;

use crate::rpc::PbMember;

/// Server Id
pub type ServerId = u64;

Expand All @@ -22,6 +24,17 @@ pub struct Member {
is_learner: bool,
}

impl From<Member> for PbMember {
fn from(member: Member) -> Self {
Self {
id: member.id,
name: member.name,
addrs: member.addrs,
is_learner: member.is_learner,
}
}
}

/// Cluster member
impl Member {
/// Create a new `Member`
Expand Down Expand Up @@ -143,10 +156,10 @@ impl ClusterInfo {
.addrs = addrs.into();
}

/// Get server address via server id
/// Get server addresses via server id
#[must_use]
#[inline]
pub fn address(&self, id: ServerId) -> Option<Vec<String>> {
pub fn addrs(&self, id: ServerId) -> Option<Vec<String>> {
self.members
.iter()
.find(|t| t.id == id)
Expand All @@ -162,7 +175,7 @@ impl ClusterInfo {
/// Get the current server address
#[must_use]
#[inline]
pub fn self_address(&self) -> Vec<String> {
pub fn self_addrs(&self) -> Vec<String> {
self.self_member().addrs.clone()
}

Expand Down Expand Up @@ -198,6 +211,7 @@ impl ClusterInfo {
timestamp: Option<u64>,
) -> ServerId {
let mut hasher = DefaultHasher::new();
// to make sure same addrs but different order will get same id
addrs.sort();
for addr in addrs {
hasher.write(addr.as_bytes());
Expand Down Expand Up @@ -306,7 +320,7 @@ mod tests {
let node1 = ClusterInfo::new(all_members, "S1");
let peers = node1.peers_addrs();
let node1_id = node1.self_id();
let node1_url = node1.self_address();
let node1_url = node1.self_addrs();
assert!(!peers.contains_key(&node1_id));
assert_eq!(peers.len(), 2);
assert_eq!(node1.members_len(), peers.len() + 1);
Expand Down
83 changes: 33 additions & 50 deletions curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,68 +41,51 @@ const SNAPSHOT_CHUNK_SIZE: u64 = 64 * 1024;
/// The default buffer size for rpc connection
const DEFAULT_BUFFER_SIZE: usize = 1024;

/// Connect implementation
macro_rules! connect_impl {
($client:ty, $api:path, $members:ident) => {
futures::future::join_all($members.into_iter().map(|(id, mut addrs)| async move {
let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE);
// Addrs must start with "http" to communicate with the server
for addr in &mut addrs {
if !addr.starts_with("http://") {
addr.insert_str(0, "http://");
}
let endpoint = Endpoint::from_shared(addr.clone())?;
let _ig = change_tx
.send(tower::discover::Change::Insert(addr.clone(), endpoint))
.await;
}
let client = <$client>::new(channel);
let connect: Arc<dyn $api> = Arc::new(Connect {
id,
rpc_connect: client,
change_tx,
addrs: Mutex::new(addrs),
});
Ok((id, connect))
}))
.await
.into_iter()
.collect::<Result<Vec<_>, tonic::transport::Error>>()
.map(IntoIterator::into_iter)
};
}

/// Convert a vec of addr string to a vec of `Connect`
/// # Errors
/// Return error if any of the address format is invalid
pub(crate) async fn connect(
members: HashMap<ServerId, Vec<String>>,
) -> Result<impl Iterator<Item = (ServerId, Arc<dyn ConnectApi>)>, tonic::transport::Error> {
futures::future::join_all(members.into_iter().map(|(id, mut addrs)| async move {
let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE);
// Addrs must start with "http" to communicate with the server
for addr in &mut addrs {
if !addr.starts_with("http://") {
addr.insert_str(0, "http://");
}
let endpoint = Endpoint::from_shared(addr.clone())?;
let _ig = change_tx
.send(tower::discover::Change::Insert(addr.clone(), endpoint))
.await;
}
let client = ProtocolClient::new(channel);
let connect: Arc<dyn ConnectApi> = Arc::new(Connect {
id,
rpc_connect: client,
change_tx,
addrs: Mutex::new(addrs),
});
Ok((id, connect))
}))
.await
.into_iter()
.collect::<Result<Vec<_>, tonic::transport::Error>>()
.map(IntoIterator::into_iter)
connect_impl!(ProtocolClient<Channel>, ConnectApi, members)
}

/// Convert a vec of addr string to a vec of `InnerConnect`
pub(crate) async fn inner_connect(
members: HashMap<ServerId, Vec<String>>,
) -> Result<impl Iterator<Item = (ServerId, Arc<dyn InnerConnectApi>)>, tonic::transport::Error> {
futures::future::join_all(members.into_iter().map(|(id, mut addrs)| async move {
let (channel, change_tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE);
// Addrs must start with "http" to communicate with the server
for addr in &mut addrs {
if !addr.starts_with("http://") {
addr.insert_str(0, "http://");
}
let endpoint = Endpoint::from_shared(addr.clone())?;
let _ig = change_tx
.send(tower::discover::Change::Insert(addr.clone(), endpoint))
.await;
}
let client = InnerProtocolClient::new(channel);
let connect: Arc<dyn InnerConnectApi> = Arc::new(Connect {
id,
rpc_connect: client,
change_tx,
addrs: Mutex::new(addrs),
});
Ok((id, connect))
}))
.await
.into_iter()
.collect::<Result<Vec<_>, tonic::transport::Error>>()
.map(IntoIterator::into_iter)
connect_impl!(InnerProtocolClient<Channel>, InnerConnectApi, members)
}

/// Connect interface between server and clients
Expand Down
11 changes: 0 additions & 11 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,6 @@ mod proto {
}
}

impl From<Member> for PbMember {
fn from(member: Member) -> Self {
Self {
id: member.id(),
name: member.name().to_owned(),
addrs: member.addrs().to_vec(),
is_learner: member.is_learner(),
}
}
}

impl FetchLeaderRequest {
/// Create a new `FetchLeaderRequest`
pub(crate) fn new() -> Self {
Expand Down
4 changes: 2 additions & 2 deletions curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ fn update_node_should_update_the_address_of_node() {
};
let follower_id = curp.cluster().get_id_by_name("S1").unwrap();
assert_eq!(
curp.cluster().address(follower_id),
curp.cluster().addrs(follower_id),
Some(vec!["S1".to_owned()])
);
let changes = vec![ConfChange::update(
Expand All @@ -805,7 +805,7 @@ fn update_node_should_update_the_address_of_node() {
let resp = curp.apply_conf_change(changes);
assert!(resp.is_ok());
assert_eq!(
curp.cluster().address(follower_id),
curp.cluster().addrs(follower_id),
Some(vec!["http://127.0.0.1:4567".to_owned()])
);
}
8 changes: 4 additions & 4 deletions xline/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ async fn main() -> Result<()> {
)
.await;

let self_addr_strings = cluster_config
let server_addr_str = cluster_config
.members()
.get(cluster_config.name())
.ok_or_else(|| {
Expand All @@ -511,7 +511,7 @@ async fn main() -> Result<()> {
cluster_config.name()
)
})?;
let self_addrs = self_addr_strings
let server_addr = server_addr_str
.iter()
.map(|addr| {
addr.to_socket_addrs()?
Expand All @@ -521,7 +521,7 @@ async fn main() -> Result<()> {
.collect::<Result<Vec<_>, _>>()?;

debug!("name = {:?}", cluster_config.name());
debug!("server_addr = {self_addrs:?}");
debug!("server_addr = {server_addr:?}");
debug!("cluster_peers = {:?}", cluster_config.members());

let name = cluster_config.name().clone();
Expand All @@ -539,7 +539,7 @@ async fn main() -> Result<()> {
*config.compact(),
);
debug!("{:?}", server);
server.start(self_addrs, db_proxy, key_pair).await?;
server.start(server_addr, db_proxy, key_pair).await?;

tokio::signal::ctrl_c().await?;
info!("received ctrl-c, shutting down, press ctrl-c again to force exit");
Expand Down
8 changes: 4 additions & 4 deletions xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ where
// a follower when it lost the election. Therefore we need to double check here.
// We can directly invoke leader_keep_alive when a candidate becomes a leader.
if !self.lease_storage.is_primary() {
let leader_addr = self.cluster_info.address(leader_id).unwrap_or_else(|| {
let leader_addrs = self.cluster_info.addrs(leader_id).unwrap_or_else(|| {
unreachable!(
"The address of leader {} not found in all_members {:?}",
leader_id, self.cluster_info
Expand All @@ -331,7 +331,7 @@ where
break self
.follower_keep_alive(
request_stream,
leader_addr,
leader_addrs,
self.shutdown_listener.clone(),
)
.await?;
Expand Down Expand Up @@ -374,14 +374,14 @@ where
.get_leader_id_from_curp()
.await
.map_err(propose_err_to_status)?;
let leader_addr = self.cluster_info.address(leader_id).unwrap_or_else(|| {
let leader_addrs = self.cluster_info.addrs(leader_id).unwrap_or_else(|| {
unreachable!(
"The address of leader {} not found in all_members {:?}",
leader_id, self.cluster_info
)
});
if !self.lease_storage.is_primary() {
let endpoints = build_endpoints(leader_addr)?;
let endpoints = build_endpoints(leader_addrs)?;
let channel = tonic::transport::Channel::balance_list(endpoints.into_iter());
let mut lease_client = LeaseClient::new(channel);
return lease_client.lease_time_to_live(request).await;
Expand Down
2 changes: 1 addition & 1 deletion xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl XlineServer {
Arc::clone(&client),
Arc::clone(&id_gen),
self.cluster_info.self_name(),
self.cluster_info.self_address(),
self.cluster_info.self_addrs(),
),
LeaseServer::new(
lease_storage,
Expand Down

0 comments on commit 283d804

Please sign in to comment.