Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-connection support #83

Merged
merged 14 commits into from
Sep 7, 2022
9 changes: 8 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,11 @@
# `narenas` is the maximum number of arenas to use for automatic multiplexing of threads and arenas.
# The default is four times the number of CPUs, or one if there is a single CPU.
# `async-rdma` doesn't need that many antomatic arenas so we set it to 1.
JEMALLOC_SYS_WITH_MALLOC_CONF = "narenas:1"

# `tcache` is a feature of `Jemalloc` to speed up memory allocation.
# However `Jemalloc` may alloc `MR` with wrong `arena_index` from `tcache`
# when we create more than one `Jemalloc` enabled `mr_allocator`s.
# So we disable `tcache` by default.
# If you want to enable `tcache` and make sure safety by yourself, change
# `JEMALLOC_SYS_WITH_MALLOC_CONF` from `tcache:false` to `tcache:true`.
JEMALLOC_SYS_WITH_MALLOC_CONF = "narenas:1,tcache:false"
18 changes: 14 additions & 4 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,25 @@ async fn request_then_write_with_imm(rdma: &Rdma) -> io::Result<()> {
#[tokio::main]
async fn main() {
println!("client start");
let rdma = RdmaBuilder::default()
.connect("localhost:5555")
.await
.unwrap();
let addr = "localhost:5555";
let rdma = RdmaBuilder::default().connect(addr).await.unwrap();
println!("connected");
send_data_to_server(&rdma).await.unwrap();
send_data_with_imm_to_server(&rdma).await.unwrap();
send_lmr_to_server(&rdma).await.unwrap();
request_then_write(&rdma).await.unwrap();
request_then_write_with_imm(&rdma).await.unwrap();
println!("client done");

// create new `Rdma`s (connections) that has the same `mr_allocator` and `event_listener` as parent
GTwhy marked this conversation as resolved.
Show resolved Hide resolved
for _ in 0..3 {
let rdma = rdma.new_connect(addr).await.unwrap();
println!("connected");
send_data_to_server(&rdma).await.unwrap();
send_data_with_imm_to_server(&rdma).await.unwrap();
send_lmr_to_server(&rdma).await.unwrap();
request_then_write(&rdma).await.unwrap();
request_then_write_with_imm(&rdma).await.unwrap();
}
println!("client done");
}
14 changes: 14 additions & 0 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,18 @@ async fn main() {
.await
.unwrap();
println!("server done");

// create new `Rdma`s (connections) that has the same `mr_allocator` and `event_listener` as parent
for _ in 0..3 {
let rdma = rdma.listen().await.unwrap();
println!("accepted");
receive_data_from_client(&rdma).await.unwrap();
receive_data_with_imm_from_client(&rdma).await.unwrap();
read_rmr_from_client(&rdma).await.unwrap();
receive_mr_after_being_written(&rdma).await.unwrap();
receive_mr_after_being_written_with_imm(&rdma)
.await
.unwrap();
}
println!("server done");
}
29 changes: 22 additions & 7 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
queue_pair::QueuePair,
};
use clippy_utilities::Cast;
use rdma_sys::ibv_access_flags;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::SystemTime;
Expand Down Expand Up @@ -59,8 +60,6 @@ pub(crate) struct Agent {
/// Agent thread resource
#[allow(dead_code)]
agent_thread: Arc<AgentThread>,
/// Max message length
max_sr_data_len: usize,
}

impl Drop for Agent {
Expand All @@ -77,9 +76,10 @@ impl Agent {
qp: Arc<QueuePair>,
allocator: Arc<MrAllocator>,
max_sr_data_len: usize,
max_rmr_access: ibv_access_flags,
) -> io::Result<Self> {
let response_waits = Arc::new(parking_lot::Mutex::new(HashMap::new()));
let rmr_manager = RemoteMrManager::new();
let rmr_manager = RemoteMrManager::new(Arc::clone(&qp.pd), max_rmr_access);
let (local_mr_send, local_mr_recv) = channel(1024);
let (remote_mr_send, remote_mr_recv) = channel(1024);
let (data_send, data_recv) = channel(1024);
Expand Down Expand Up @@ -112,7 +112,6 @@ impl Agent {
imm_recv,
handles,
agent_thread,
max_sr_data_len,
})
}

Expand Down Expand Up @@ -209,7 +208,7 @@ impl Agent {
let mut start = 0;
let lm_len = lm.length();
while start < lm_len {
let end = (start.saturating_add(self.max_sr_data_len)).min(lm_len);
let end = (start.saturating_add(self.max_msg_len())).min(lm_len);
let kind = RequestKind::SendData(SendDataRequest {
len: end.wrapping_sub(start),
});
Expand Down Expand Up @@ -266,6 +265,16 @@ impl Agent {
.await
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "imm data channel closed"))
}

/// Get the max length of message for send/recv
pub(crate) fn max_msg_len(&self) -> usize {
self.inner.max_sr_data_len
}

/// Get the max access permission for remote mr requests
pub(crate) fn max_rmr_access(&self) -> ibv_access_flags {
self.inner.rmr_manager.max_rmr_access
}
}

/// Agent thread data structure, actually it spawn a task on the tokio thread pool
Expand Down Expand Up @@ -426,9 +435,11 @@ impl AgentThread {
let response = match request.kind {
RequestKind::AllocMR(param) => {
// TODO: error handling
let mr = self.inner.allocator.alloc_zeroed_default(
let mr = self.inner.allocator.alloc_zeroed(
&Layout::from_size_align(param.size, param.align)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?,
self.inner.rmr_manager.max_rmr_access,
&self.inner.rmr_manager.pd,
)?;
// SAFETY: no date race here
let token = unsafe { mr.token_with_timeout_unchecked(param.timeout) }.map_or_else(
Expand Down Expand Up @@ -718,7 +729,7 @@ lazy_static! {
};
}
/// Used for checking if `header_buf` is clean.
const CLEAN_STATE: [u8; 52] = [0_u8; 52];
const CLEAN_STATE: [u8; 56] = [0_u8; 56];

lazy_static! {
static ref REQUEST_HEADER_MAX_LEN: usize = {
Expand All @@ -734,6 +745,7 @@ lazy_static! {
len: 0,
rkey: 0,
ddl: SystemTime::now(),
access:0,
},
}),
RequestKind::SendMR(SendMRRequest {
Expand All @@ -742,6 +754,7 @@ lazy_static! {
len: 0,
rkey: 0,
ddl: SystemTime::now(),
access:0,
}),
}),
RequestKind::SendMR(SendMRRequest {
Expand All @@ -750,6 +763,7 @@ lazy_static! {
len: 0,
rkey: 0,
ddl: SystemTime::now(),
access:0,
}),
}),
RequestKind::SendData(SendDataRequest { len: 0 }),
Expand Down Expand Up @@ -781,6 +795,7 @@ lazy_static! {
len: 0,
rkey: 0,
ddl: SystemTime::now(),
access:0,
},
}),
ResponseKind::ReleaseMR(ReleaseMRResponse { status: 0 }),
Expand Down
5 changes: 3 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ impl Context {

// SAFETY: POD FFI type
let mut inner_port_attr = unsafe { std::mem::zeroed() };
let errno =
unsafe { rdma_sys::___ibv_query_port(inner_ctx.as_ptr(), 1, &mut inner_port_attr) };
let errno = unsafe {
rdma_sys::___ibv_query_port(inner_ctx.as_ptr(), port_num, &mut inner_port_attr)
};
if errno != 0_i32 {
return Err(log_ret_last_os_err_with_note("ibv_query_port failed"));
}
Expand Down
Loading