Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sendmsg_zc support to tokio-uring. #1

Draft
wants to merge 30 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ keywords = ["async", "fs", "io-uring"]
tokio = { version = "1.2", features = ["net", "rt"] }
slab = "0.4.2"
libc = "0.2.80"
io-uring = { version = "0.5.9", features = ["unstable"] }
io-uring = { version = "0.5.12", features = ["unstable"] }
socket2 = { version = "0.4.4", features = ["all"] }
bytes = { version = "1.0", optional = true }

Expand Down
2 changes: 2 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ mod send_to;

mod send_zc;

mod sendmsg_zc;

mod shared_fd;
pub(crate) use shared_fd::SharedFd;

Expand Down
105 changes: 105 additions & 0 deletions src/io/sendmsg_zc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use crate::buf::IoBuf;
use crate::io::SharedFd;
use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable};
use crate::runtime::CONTEXT;
use socket2::SockAddr;
use std::io;
use std::io::IoSlice;
use std::net::SocketAddr;

pub(crate) struct SendMsgZc<T, U> {
#[allow(dead_code)]
fd: SharedFd,
#[allow(dead_code)]
io_bufs: Vec<T>,
#[allow(dead_code)]
socket_addr: Box<SockAddr>,
msg_control: Option<U>,
msghdr: libc::msghdr,

/// Hold the number of transmitted bytes
bytes: usize,
}

impl<T: IoBuf, U: IoBuf> Op<SendMsgZc<T, U>, MultiCQEFuture> {
pub(crate) fn sendmsg_zc(
fd: &SharedFd,
io_bufs: Vec<T>,
socket_addr: SocketAddr,
msg_control: Option<U>,
) -> io::Result<Self> {
use io_uring::{opcode, types};

let socket_addr = Box::new(SockAddr::from(socket_addr));

let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() };

let mut io_slices: Vec<IoSlice> = Vec::with_capacity(io_bufs.len());

for io_buf in &io_bufs {
io_slices.push(IoSlice::new(unsafe {
std::slice::from_raw_parts(io_buf.stable_ptr(), io_buf.bytes_init())
}))
}

msghdr.msg_iov = io_slices.as_ptr() as *mut _;
msghdr.msg_iovlen = io_slices.len() as _;
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
msghdr.msg_namelen = socket_addr.len();

match msg_control {
Some(ref _msg_control) => {
msghdr.msg_control = _msg_control.stable_ptr() as *mut _;
msghdr.msg_controllen = _msg_control.bytes_init();
}
None => {
msghdr.msg_control = std::ptr::null_mut();
msghdr.msg_controllen = 0_usize;
}
}

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
SendMsgZc {
fd: fd.clone(),
io_bufs,
socket_addr,
msg_control,
msghdr,
bytes: 0,
},
|sendmsg_zc| {
opcode::SendMsgZc::new(
types::Fd(sendmsg_zc.fd.raw_fd()),
&sendmsg_zc.msghdr as *const _,
)
.build()
},
)
})
}
}

impl<T, U> Completable for SendMsgZc<T, U> {
type Output = (io::Result<usize>, Vec<T>, Option<U>);

fn complete(self, cqe: CqeResult) -> (io::Result<usize>, Vec<T>, Option<U>) {
// Convert the operation result to `usize`, and add previous byte count
let res = cqe.result.map(|v| self.bytes + v as usize);

// Recover the data buffers.
let io_bufs = self.io_bufs;

// Recover the ancillary data buffer.
let msg_control = self.msg_control;

(res, io_bufs, msg_control)
}
}

impl<T, U> Updateable for SendMsgZc<T, U> {
fn update(&mut self, cqe: CqeResult) {
// uring send_zc promises there will be no error on CQE's marked more
self.bytes += *cqe.result.as_ref().unwrap() as usize;
}
}
10 changes: 10 additions & 0 deletions src/io/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ impl Socket {
op.await
}

pub(crate) async fn sendmsg_zc<T: IoBuf, U: IoBuf>(
&self,
io_slices: Vec<T>,
socket_addr: SocketAddr,
msg_control: Option<U>,
) -> (io::Result<usize>, Vec<T>, Option<U>) {
let op = Op::sendmsg_zc(&self.fd, io_slices, socket_addr, msg_control).unwrap();
op.await
}

pub(crate) async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::read_at(&self.fd, buf, 0).unwrap();
op.await
Expand Down
14 changes: 13 additions & 1 deletion src/net/udp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
buf::fixed::FixedBuf,
buf::{BoundedBuf, BoundedBufMut},
buf::{BoundedBuf, BoundedBufMut, IoBuf},
io::{SharedFd, Socket},
};
use socket2::SockAddr;
Expand Down Expand Up @@ -220,6 +220,18 @@ impl UdpSocket {
self.inner.send_zc(buf).await
}

/// Sends a message on the socket using a msghdr.
pub async fn sendmsg_zc<T: IoBuf, U: IoBuf>(
&self,
io_slices: Vec<T>,
socket_addr: SocketAddr,
msg_control: Option<U>,
) -> (io::Result<usize>, Vec<T>, Option<U>) {
self.inner
.sendmsg_zc(io_slices, socket_addr, msg_control)
.await
}

/// Receives a single datagram message on the socket. On success, returns
/// the number of bytes read and the origin.
pub async fn recv_from<T: BoundedBufMut>(
Expand Down