From 94a9ec6bc10ef94d06378b0ff05eed2d2577586a Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 10 Jan 2023 13:38:21 +0000 Subject: [PATCH 01/29] Start adding stuff to support sendmsg_zc. --- src/io/mod.rs | 2 ++ src/io/sendmsg_zc.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++ src/io/socket.rs | 5 ++++ src/net/udp.rs | 4 +++ 4 files changed, 80 insertions(+) create mode 100644 src/io/sendmsg_zc.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index c92687ed..d6386f77 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -28,6 +28,8 @@ mod send_to; mod send_zc; +mod sendmsg_zc; + mod shared_fd; pub(crate) use shared_fd::SharedFd; diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs new file mode 100644 index 00000000..0f04229f --- /dev/null +++ b/src/io/sendmsg_zc.rs @@ -0,0 +1,69 @@ +use crate::buf::BoundedBuf; +use crate::io::SharedFd; +use crate::runtime::driver::op::{Completable, CqeResult, Op}; +use crate::runtime::CONTEXT; +use crate::BufResult; +use socket2::SockAddr; +use std::io::IoSlice; +use std::{boxed::Box, io, net::SocketAddr}; + +pub(crate) struct SendMsgZc { + #[allow(dead_code)] + fd: SharedFd, + + /*pub(crate) buf: T, + + #[allow(dead_code)] + io_slices: Vec>, + + #[allow(dead_code)] + socket_addr: Box,*/ + + pub(crate) msghdr: Box, +} + +impl Op, MultiCQEFuture> { + pub(crate) fn sendmsg_zc(fd: &SharedFd, msghdr: Box) -> io::Result { + use io_uring::{opcode, types}; + + /*let io_slices = vec![IoSlice::new(unsafe { + std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init()) + })]; + + let socket_addr = Box::new(SockAddr::from(socket_addr)); + + let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); + msghdr.msg_iov = io_slices.as_ptr() as *mut _; + msghdr.msg_iovlen = io_slices.len() as _; + msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; + msghdr.msg_namelen = socket_addr.len();*/ + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + SendMsgZc { + fd: fd.clone(), + /*buf, + io_slices, + socket_addr,*/ + msghdr: msghdr.clone(), + }, + |sendmsg_zc| { + opcode::SendMsgZc::new(types::Fd(sendmsg_zc.fd.raw_fd()), sendmsg_zc.msghdr.as_ref() as *const _).build() + }, + ) + }) + } +} + +impl Completable for SendMsgZc { + type Output = BufResult; + + fn complete(self, cqe: CqeResult) -> Self::Output { + // Convert the operation result to `usize` + let res = cqe.result.map(|v| v as usize); + // Recover the buffer + let buf = self.buf; + + (res, buf) + } +} diff --git a/src/io/socket.rs b/src/io/socket.rs index 5e6bc8cf..d2ef6e9c 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -147,6 +147,11 @@ impl Socket { op.await } + pub(crate) async fn sendmsg_zc(&self, msghdr: T) -> crate::BufResult { + let op = Op::sendmsg_zc(&self.fd, msghdr).unwrap(); + op.await + } + pub(crate) async fn read(&self, buf: T) -> crate::BufResult { let op = Op::read_at(&self.fd, buf, 0).unwrap(); op.await diff --git a/src/net/udp.rs b/src/net/udp.rs index 47d4dee6..76320cb6 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -220,6 +220,10 @@ impl UdpSocket { self.inner.send_zc(buf).await } + pub async fn sendmsg_zc>(&self, msghdr: T) -> crate::BufResult { + self.inner.sendmsg_zc(msghdr).await + } + /// Receives a single datagram message on the socket. On success, returns /// the number of bytes read and the origin. pub async fn recv_from( From 78f3f77d53b93bcf5b7e9cafd4af7426a5284091 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 10 Jan 2023 13:38:32 +0000 Subject: [PATCH 02/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 0f04229f..4f53929f 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -22,7 +22,7 @@ pub(crate) struct SendMsgZc { pub(crate) msghdr: Box, } -impl Op, MultiCQEFuture> { +impl> Op, MultiCQEFuture> { pub(crate) fn sendmsg_zc(fd: &SharedFd, msghdr: Box) -> io::Result { use io_uring::{opcode, types}; From fc8c89a52cacf09277264786c0a8e060140f20d0 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Thu, 12 Jan 2023 09:43:04 +0000 Subject: [PATCH 03/29] Updated SendMsgZc stuff. --- src/io/sendmsg_zc.rs | 47 ++++++++++++++++++++------------------------ src/io/socket.rs | 2 +- src/net/udp.rs | 2 +- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 4f53929f..ade5ac6f 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -1,29 +1,21 @@ -use crate::buf::BoundedBuf; +//use crate::buf::BoundedBuf; use crate::io::SharedFd; -use crate::runtime::driver::op::{Completable, CqeResult, Op}; +use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; -use crate::BufResult; -use socket2::SockAddr; -use std::io::IoSlice; -use std::{boxed::Box, io, net::SocketAddr}; +use std::io; -pub(crate) struct SendMsgZc { +pub(crate) struct SendMsgZc { #[allow(dead_code)] fd: SharedFd, - /*pub(crate) buf: T, - - #[allow(dead_code)] - io_slices: Vec>, - - #[allow(dead_code)] - socket_addr: Box,*/ + pub(crate) msghdr: libc::msghdr, - pub(crate) msghdr: Box, + /// Hold the number of transmitted bytes + bytes: usize, } -impl> Op, MultiCQEFuture> { - pub(crate) fn sendmsg_zc(fd: &SharedFd, msghdr: Box) -> io::Result { +impl Op { + pub(crate) fn sendmsg_zc(fd: &SharedFd, msghdr: &libc::msghdr) -> io::Result { use io_uring::{opcode, types}; /*let io_slices = vec![IoSlice::new(unsafe { @@ -42,28 +34,31 @@ impl> Op, MultiCQEFuture> { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { fd: fd.clone(), - /*buf, - io_slices, - socket_addr,*/ msghdr: msghdr.clone(), + bytes: 0, }, |sendmsg_zc| { - opcode::SendMsgZc::new(types::Fd(sendmsg_zc.fd.raw_fd()), sendmsg_zc.msghdr.as_ref() as *const _).build() + opcode::SendMsgZc::new(types::Fd(sendmsg_zc.fd.raw_fd()), &sendmsg_zc.msghdr as *const _).build() }, ) }) } } -impl Completable for SendMsgZc { - type Output = BufResult; +impl Completable for SendMsgZc { + type Output = io::Result; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.buf; + + res + } +} - (res, buf) +impl Updateable for SendMsgZc { + fn update(&mut self, cqe: CqeResult) { + // uring send_zc promises there will be no error on CQE's marked more + self.bytes += *cqe.result.as_ref().unwrap() as usize; } } diff --git a/src/io/socket.rs b/src/io/socket.rs index d2ef6e9c..dc04dd99 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -147,7 +147,7 @@ impl Socket { op.await } - pub(crate) async fn sendmsg_zc(&self, msghdr: T) -> crate::BufResult { + pub(crate) async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> io::Result { let op = Op::sendmsg_zc(&self.fd, msghdr).unwrap(); op.await } diff --git a/src/net/udp.rs b/src/net/udp.rs index 76320cb6..cc4b1bbf 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -220,7 +220,7 @@ impl UdpSocket { self.inner.send_zc(buf).await } - pub async fn sendmsg_zc>(&self, msghdr: T) -> crate::BufResult { + pub async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> io::Result { self.inner.sendmsg_zc(msghdr).await } From f81b58cc21f8d44fcd63b1c51dfb3a20cb6f320b Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Thu, 12 Jan 2023 09:47:19 +0000 Subject: [PATCH 04/29] Cleanup --- src/io/sendmsg_zc.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index ade5ac6f..511b4d9e 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -18,18 +18,6 @@ impl Op { pub(crate) fn sendmsg_zc(fd: &SharedFd, msghdr: &libc::msghdr) -> io::Result { use io_uring::{opcode, types}; - /*let io_slices = vec![IoSlice::new(unsafe { - std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init()) - })]; - - let socket_addr = Box::new(SockAddr::from(socket_addr)); - - let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); - msghdr.msg_iov = io_slices.as_ptr() as *mut _; - msghdr.msg_iovlen = io_slices.len() as _; - msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - msghdr.msg_namelen = socket_addr.len();*/ - CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { From 18b3f76be6bdc77a98ce18d050f3ee3d5e338e7c Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Thu, 12 Jan 2023 11:12:42 +0000 Subject: [PATCH 05/29] Get updated io-uring. --- Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index be1976e5..d5e2e246 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,3 +48,6 @@ harness = false name = "criterion_no_op" path = "benches/criterion/no_op.rs" harness = false + +[patch.crates-io] +io-uring = { git = 'https://github.com/tokio-rs/io-uring', branch = 'master' } \ No newline at end of file From 863ca228b60d53a69d8d9468fa38627e2111b70f Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 09:13:59 +0000 Subject: [PATCH 06/29] Update return types. --- src/io/sendmsg_zc.rs | 8 +++++--- src/io/socket.rs | 2 +- src/net/udp.rs | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 511b4d9e..2d6f373a 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -34,13 +34,15 @@ impl Op { } impl Completable for SendMsgZc { - type Output = io::Result; + //type Output = io::Result; - fn complete(self, cqe: CqeResult) -> Self::Output { + fn complete(self, cqe: CqeResult) -> (libc::msghdr, io::Result) { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); + + let msghdr = self.msghdr; - res + (msghdr, res) } } diff --git a/src/io/socket.rs b/src/io/socket.rs index dc04dd99..8159d74d 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -147,7 +147,7 @@ impl Socket { op.await } - pub(crate) async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> io::Result { + pub(crate) async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> (libc::msghdr, io::Result) { let op = Op::sendmsg_zc(&self.fd, msghdr).unwrap(); op.await } diff --git a/src/net/udp.rs b/src/net/udp.rs index cc4b1bbf..4ec3bb72 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -220,7 +220,7 @@ impl UdpSocket { self.inner.send_zc(buf).await } - pub async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> io::Result { + pub async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> (libc::msghdr, io::Result) { self.inner.sendmsg_zc(msghdr).await } From 14c5d746e64c72340d2a8310e9a967481011be61 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 09:16:48 +0000 Subject: [PATCH 07/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 2d6f373a..e88f4455 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -34,7 +34,7 @@ impl Op { } impl Completable for SendMsgZc { - //type Output = io::Result; + type Output = (libc::msghdr, io::Result) ; fn complete(self, cqe: CqeResult) -> (libc::msghdr, io::Result) { // Convert the operation result to `usize` From 3bc451b3eff124788e4a9592a1093e0861861c1d Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 09:19:17 +0000 Subject: [PATCH 08/29] Added comment. --- src/io/sendmsg_zc.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index e88f4455..335e5420 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -40,6 +40,7 @@ impl Completable for SendMsgZc { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); + // Recover the msghdr. let msghdr = self.msghdr; (msghdr, res) From 07106a79cc8c1a63d2233664a0266c8994d44d00 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 09:41:13 +0000 Subject: [PATCH 09/29] Fix rust-fmt + add comment. --- src/io/sendmsg_zc.rs | 13 ++++++++----- src/io/socket.rs | 5 ++++- src/net/udp.rs | 3 ++- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 335e5420..be3149c0 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -1,4 +1,3 @@ -//use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; @@ -22,11 +21,15 @@ impl Op { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { fd: fd.clone(), - msghdr: msghdr.clone(), + msghdr: msghdr.copy(), bytes: 0, }, |sendmsg_zc| { - opcode::SendMsgZc::new(types::Fd(sendmsg_zc.fd.raw_fd()), &sendmsg_zc.msghdr as *const _).build() + opcode::SendMsgZc::new( + types::Fd(sendmsg_zc.fd.raw_fd()), + &sendmsg_zc.msghdr as *const _ + ) + .build() }, ) }) @@ -34,7 +37,7 @@ impl Op { } impl Completable for SendMsgZc { - type Output = (libc::msghdr, io::Result) ; + type Output = (libc::msghdr, io::Result); fn complete(self, cqe: CqeResult) -> (libc::msghdr, io::Result) { // Convert the operation result to `usize` @@ -42,7 +45,7 @@ impl Completable for SendMsgZc { // Recover the msghdr. let msghdr = self.msghdr; - + (msghdr, res) } } diff --git a/src/io/socket.rs b/src/io/socket.rs index 8159d74d..dd09855c 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -147,7 +147,10 @@ impl Socket { op.await } - pub(crate) async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> (libc::msghdr, io::Result) { + pub(crate) async fn sendmsg_zc( + &self, + msghdr: &libc::msghdr + ) -> (libc::msghdr, io::Result) { let op = Op::sendmsg_zc(&self.fd, msghdr).unwrap(); op.await } diff --git a/src/net/udp.rs b/src/net/udp.rs index 4ec3bb72..28885af7 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -220,7 +220,8 @@ impl UdpSocket { self.inner.send_zc(buf).await } - pub async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> (libc::msghdr, io::Result) { + /// Sends a message on the socket using a msghdr. + pub async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> (libc::msghdr, io::Result) { self.inner.sendmsg_zc(msghdr).await } From cb8d9519e23da8fded0b51bbed986583b655245b Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 09:47:08 +0000 Subject: [PATCH 10/29] Update --- src/io/sendmsg_zc.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index be3149c0..bbf792ee 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -21,13 +21,13 @@ impl Op { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { fd: fd.clone(), - msghdr: msghdr.copy(), + msghdr: msghdr.clone(), bytes: 0, }, |sendmsg_zc| { opcode::SendMsgZc::new( types::Fd(sendmsg_zc.fd.raw_fd()), - &sendmsg_zc.msghdr as *const _ + &sendmsg_zc.msghdr as *const _, ) .build() }, From 7f02991740bec324e8eff0379acb5224b36848a7 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 09:49:23 +0000 Subject: [PATCH 11/29] Update --- src/io/sendmsg_zc.rs | 4 ++-- src/io/socket.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index bbf792ee..136c75c1 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -21,12 +21,12 @@ impl Op { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { fd: fd.clone(), - msghdr: msghdr.clone(), + msghdr: (*msghdr).clone(), bytes: 0, }, |sendmsg_zc| { opcode::SendMsgZc::new( - types::Fd(sendmsg_zc.fd.raw_fd()), + types::Fd(sendmsg_zc.fd.raw_fd()), &sendmsg_zc.msghdr as *const _, ) .build() diff --git a/src/io/socket.rs b/src/io/socket.rs index dd09855c..3c8bc1a2 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -148,8 +148,8 @@ impl Socket { } pub(crate) async fn sendmsg_zc( - &self, - msghdr: &libc::msghdr + &self, + msghdr: &libc::msghdr, ) -> (libc::msghdr, io::Result) { let op = Op::sendmsg_zc(&self.fd, msghdr).unwrap(); op.await From c35b357e1cbe026183358d4c2d1f162e273ac333 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 09:51:52 +0000 Subject: [PATCH 12/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 136c75c1..9debaa44 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -21,7 +21,7 @@ impl Op { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { fd: fd.clone(), - msghdr: (*msghdr).clone(), + msghdr: (*msghdr).copy(), bytes: 0, }, |sendmsg_zc| { From 22a746fd00e12b025340126c68a9083235f7d4ad Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 10:09:24 +0000 Subject: [PATCH 13/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 9debaa44..79ac1f77 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -21,7 +21,7 @@ impl Op { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { fd: fd.clone(), - msghdr: (*msghdr).copy(), + msghdr: *msghdr, bytes: 0, }, |sendmsg_zc| { From 8b95cb48c25744def2412332926551c3d9bd23fc Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 15:49:13 +0000 Subject: [PATCH 14/29] Modified sendmsg_zc to use Vec. --- src/io/sendmsg_zc.rs | 43 +++++++++++++++++++++++++++++++++++-------- src/io/socket.rs | 9 ++++++--- src/net/udp.rs | 10 ++++++++-- 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 79ac1f77..faf9aa94 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -2,11 +2,18 @@ use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; use std::io; +use std::io::IoSlice; +use std::net::SocketAddr; +use socket2::SockAddr; pub(crate) struct SendMsgZc { #[allow(dead_code)] fd: SharedFd, - + #[allow(dead_code)] + io_slices: Vec>, + #[allow(dead_code)] + socket_addr: Box, + msg_control: Option>, pub(crate) msghdr: libc::msghdr, /// Hold the number of transmitted bytes @@ -14,14 +21,31 @@ pub(crate) struct SendMsgZc { } impl Op { - pub(crate) fn sendmsg_zc(fd: &SharedFd, msghdr: &libc::msghdr) -> io::Result { + pub(crate) fn sendmsg_zc(fd: &SharedFd, io_slices: Vec>, socket_addr: SocketAddr, msg_control: Option>) -> io::Result { use io_uring::{opcode, types}; + let socket_addr = Box::new(SockAddr::from(socket_addr)); + + let mut msghdr: libc::msghdr; + + msghdr.msg_iov = io_slices.as_ptr() as *mut _; + msghdr.msg_iovlen = io_slices.len() as _; + msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; + msghdr.msg_namelen = socket_addr.len(); + + match msg_control { + Some(_msg_control) => {msghdr.msg_control = _msg_control.as_ptr() as *mut _; msghdr.msg_controllen = _msg_control.len(); }, + None => { msghdr.msg_control = std::ptr::null_mut(); msghdr.msg_controllen = 0 as usize; } + } + CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { fd: fd.clone(), - msghdr: *msghdr, + io_slices, + socket_addr, + msg_control, + msghdr, bytes: 0, }, |sendmsg_zc| { @@ -37,16 +61,19 @@ impl Op { } impl Completable for SendMsgZc { - type Output = (libc::msghdr, io::Result); + type Output = (io::Result, Vec>, Option>); - fn complete(self, cqe: CqeResult) -> (libc::msghdr, io::Result) { + fn complete(self, cqe: CqeResult) -> (io::Result, Vec>, Option>) { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); - // Recover the msghdr. - let msghdr = self.msghdr; + // Recover the data buffers. + let io_slices = self.io_slices; + + // Recover the ancillary data buffer. + let msg_control = self.msg_control; - (msghdr, res) + (res, io_slices, msg_control) } } diff --git a/src/io/socket.rs b/src/io/socket.rs index 3c8bc1a2..11e962ae 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -6,6 +6,7 @@ use crate::{ }; use std::{ io, + io::IoSlice, net::SocketAddr, os::unix::io::{AsRawFd, IntoRawFd, RawFd}, path::Path, @@ -149,9 +150,11 @@ impl Socket { pub(crate) async fn sendmsg_zc( &self, - msghdr: &libc::msghdr, - ) -> (libc::msghdr, io::Result) { - let op = Op::sendmsg_zc(&self.fd, msghdr).unwrap(); + io_slices: Vec>, + socket_addr: SocketAddr, + msg_control: Option>, + ) -> (io::Result, Vec>, Option>) { + let op = Op::sendmsg_zc(&self.fd, io_slices, socket_addr, msg_control).unwrap(); op.await } diff --git a/src/net/udp.rs b/src/net/udp.rs index 28885af7..586d0132 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -6,6 +6,7 @@ use crate::{ use socket2::SockAddr; use std::{ io, + io::IoSlice, net::SocketAddr, os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, }; @@ -221,8 +222,13 @@ impl UdpSocket { } /// Sends a message on the socket using a msghdr. - pub async fn sendmsg_zc(&self, msghdr: &libc::msghdr) -> (libc::msghdr, io::Result) { - self.inner.sendmsg_zc(msghdr).await + pub async fn sendmsg_zc( + &self, + io_slices: Vec>, + socket_addr: SocketAddr, + msg_control: Option>, + ) -> (io::Result, Vec>, Option>) { + self.inner.sendmsg_zc(io_slices, socket_addr, msg_control).await } /// Receives a single datagram message on the socket. On success, returns From ded196b101808ca1b2f130173af0164c8c04789a Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 17 Jan 2023 15:50:04 +0000 Subject: [PATCH 15/29] Formatting. --- src/io/sendmsg_zc.rs | 34 ++++++++++++++++++++++++++++------ src/io/socket.rs | 6 +++++- src/net/udp.rs | 10 ++++++++-- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index faf9aa94..8e673859 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -1,10 +1,10 @@ use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; +use socket2::SockAddr; use std::io; use std::io::IoSlice; use std::net::SocketAddr; -use socket2::SockAddr; pub(crate) struct SendMsgZc { #[allow(dead_code)] @@ -21,7 +21,12 @@ pub(crate) struct SendMsgZc { } impl Op { - pub(crate) fn sendmsg_zc(fd: &SharedFd, io_slices: Vec>, socket_addr: SocketAddr, msg_control: Option>) -> io::Result { + pub(crate) fn sendmsg_zc( + fd: &SharedFd, + io_slices: Vec>, + socket_addr: SocketAddr, + msg_control: Option>, + ) -> io::Result { use io_uring::{opcode, types}; let socket_addr = Box::new(SockAddr::from(socket_addr)); @@ -34,8 +39,14 @@ impl Op { msghdr.msg_namelen = socket_addr.len(); match msg_control { - Some(_msg_control) => {msghdr.msg_control = _msg_control.as_ptr() as *mut _; msghdr.msg_controllen = _msg_control.len(); }, - None => { msghdr.msg_control = std::ptr::null_mut(); msghdr.msg_controllen = 0 as usize; } + Some(_msg_control) => { + msghdr.msg_control = _msg_control.as_ptr() as *mut _; + msghdr.msg_controllen = _msg_control.len(); + } + None => { + msghdr.msg_control = std::ptr::null_mut(); + msghdr.msg_controllen = 0 as usize; + } } CONTEXT.with(|x| { @@ -61,9 +72,20 @@ impl Op { } impl Completable for SendMsgZc { - type Output = (io::Result, Vec>, Option>); + type Output = ( + io::Result, + Vec>, + Option>, + ); - fn complete(self, cqe: CqeResult) -> (io::Result, Vec>, Option>) { + fn complete( + self, + cqe: CqeResult, + ) -> ( + io::Result, + Vec>, + Option>, + ) { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); diff --git a/src/io/socket.rs b/src/io/socket.rs index 11e962ae..518e7913 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -153,7 +153,11 @@ impl Socket { io_slices: Vec>, socket_addr: SocketAddr, msg_control: Option>, - ) -> (io::Result, Vec>, Option>) { + ) -> ( + io::Result, + Vec>, + Option>, + ) { let op = Op::sendmsg_zc(&self.fd, io_slices, socket_addr, msg_control).unwrap(); op.await } diff --git a/src/net/udp.rs b/src/net/udp.rs index 586d0132..07ac978d 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -227,8 +227,14 @@ impl UdpSocket { io_slices: Vec>, socket_addr: SocketAddr, msg_control: Option>, - ) -> (io::Result, Vec>, Option>) { - self.inner.sendmsg_zc(io_slices, socket_addr, msg_control).await + ) -> ( + io::Result, + Vec>, + Option>, + ) { + self.inner + .sendmsg_zc(io_slices, socket_addr, msg_control) + .await } /// Receives a single datagram message on the socket. On success, returns From e11bfd95097ed54264bf0a6613feb0849833c2cf Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 08:41:24 +0000 Subject: [PATCH 16/29] Update sendmsg_zc to use IoBufs instead of IoSlices. --- src/io/sendmsg_zc.rs | 40 ++++++++++++++++++++-------------------- src/io/socket.rs | 11 +++++------ src/net/udp.rs | 13 ++++++------- 3 files changed, 31 insertions(+), 33 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 8e673859..54447a3e 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -1,31 +1,31 @@ use crate::io::SharedFd; +use crate::buf::IoBuf; use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; use socket2::SockAddr; use std::io; -use std::io::IoSlice; use std::net::SocketAddr; -pub(crate) struct SendMsgZc { +pub(crate) struct SendMsgZc { #[allow(dead_code)] fd: SharedFd, #[allow(dead_code)] - io_slices: Vec>, + io_bufs: Vec, #[allow(dead_code)] socket_addr: Box, - msg_control: Option>, + msg_control: Option, pub(crate) msghdr: libc::msghdr, /// Hold the number of transmitted bytes bytes: usize, } -impl Op { +impl Op, MultiCQEFuture> { pub(crate) fn sendmsg_zc( fd: &SharedFd, - io_slices: Vec>, + io_bufs: Vec, socket_addr: SocketAddr, - msg_control: Option>, + msg_control: Option, ) -> io::Result { use io_uring::{opcode, types}; @@ -33,15 +33,15 @@ impl Op { let mut msghdr: libc::msghdr; - msghdr.msg_iov = io_slices.as_ptr() as *mut _; - msghdr.msg_iovlen = io_slices.len() as _; + msghdr.msg_iov = io_bufs.as_ptr() as *mut _; + msghdr.msg_iovlen = io_bufs.len() as _; msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; msghdr.msg_namelen = socket_addr.len(); match msg_control { Some(_msg_control) => { - msghdr.msg_control = _msg_control.as_ptr() as *mut _; - msghdr.msg_controllen = _msg_control.len(); + msghdr.msg_control = _msg_control.stable_ptr() as *mut _; + msghdr.msg_controllen = _msg_control.bytes_init(); } None => { msghdr.msg_control = std::ptr::null_mut(); @@ -53,7 +53,7 @@ impl Op { x.handle().expect("Not in a runtime context").submit_op( SendMsgZc { fd: fd.clone(), - io_slices, + io_bufs, socket_addr, msg_control, msghdr, @@ -71,11 +71,11 @@ impl Op { } } -impl Completable for SendMsgZc { +impl Completable for SendMsgZc { type Output = ( io::Result, - Vec>, - Option>, + Vec, + Option, ); fn complete( @@ -83,23 +83,23 @@ impl Completable for SendMsgZc { cqe: CqeResult, ) -> ( io::Result, - Vec>, - Option>, + Vec, + Option, ) { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); // Recover the data buffers. - let io_slices = self.io_slices; + let io_bufs = self.io_bufs; // Recover the ancillary data buffer. let msg_control = self.msg_control; - (res, io_slices, msg_control) + (res, io_bufs, msg_control) } } -impl Updateable for SendMsgZc { +impl Updateable for SendMsgZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more self.bytes += *cqe.result.as_ref().unwrap() as usize; diff --git a/src/io/socket.rs b/src/io/socket.rs index 518e7913..2fd6ebae 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -6,7 +6,6 @@ use crate::{ }; use std::{ io, - io::IoSlice, net::SocketAddr, os::unix::io::{AsRawFd, IntoRawFd, RawFd}, path::Path, @@ -148,15 +147,15 @@ impl Socket { op.await } - pub(crate) async fn sendmsg_zc( + pub(crate) async fn sendmsg_zc( &self, - io_slices: Vec>, + io_slices: Vec, socket_addr: SocketAddr, - msg_control: Option>, + msg_control: Option, ) -> ( io::Result, - Vec>, - Option>, + Vec, + Option, ) { let op = Op::sendmsg_zc(&self.fd, io_slices, socket_addr, msg_control).unwrap(); op.await diff --git a/src/net/udp.rs b/src/net/udp.rs index 07ac978d..65d43e32 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,12 +1,11 @@ use crate::{ buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{BoundedBuf, BoundedBufMut, IoBuf}, io::{SharedFd, Socket}, }; use socket2::SockAddr; use std::{ io, - io::IoSlice, net::SocketAddr, os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, }; @@ -222,15 +221,15 @@ impl UdpSocket { } /// Sends a message on the socket using a msghdr. - pub async fn sendmsg_zc( + pub async fn sendmsg_zc( &self, - io_slices: Vec>, + io_slices: Vec, socket_addr: SocketAddr, - msg_control: Option>, + msg_control: Option, ) -> ( io::Result, - Vec>, - Option>, + Vec, + Option, ) { self.inner .sendmsg_zc(io_slices, socket_addr, msg_control) From 690a82b524870e227be622da5feff5bf2e83ba37 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 08:44:01 +0000 Subject: [PATCH 17/29] Update formatting. --- src/io/sendmsg_zc.rs | 19 ++++--------------- src/io/socket.rs | 6 +----- src/net/udp.rs | 6 +----- 3 files changed, 6 insertions(+), 25 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 54447a3e..19bf86c1 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -1,5 +1,5 @@ -use crate::io::SharedFd; use crate::buf::IoBuf; +use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; use socket2::SockAddr; @@ -14,7 +14,7 @@ pub(crate) struct SendMsgZc { #[allow(dead_code)] socket_addr: Box, msg_control: Option, - pub(crate) msghdr: libc::msghdr, + msghdr: libc::msghdr, /// Hold the number of transmitted bytes bytes: usize, @@ -72,20 +72,9 @@ impl Op, MultiCQEFuture> { } impl Completable for SendMsgZc { - type Output = ( - io::Result, - Vec, - Option, - ); + type Output = (io::Result, Vec, Option); - fn complete( - self, - cqe: CqeResult, - ) -> ( - io::Result, - Vec, - Option, - ) { + fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); diff --git a/src/io/socket.rs b/src/io/socket.rs index 2fd6ebae..6f914948 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -152,11 +152,7 @@ impl Socket { io_slices: Vec, socket_addr: SocketAddr, msg_control: Option, - ) -> ( - io::Result, - Vec, - Option, - ) { + ) -> (io::Result, Vec, Option) { let op = Op::sendmsg_zc(&self.fd, io_slices, socket_addr, msg_control).unwrap(); op.await } diff --git a/src/net/udp.rs b/src/net/udp.rs index 65d43e32..804584d6 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -226,11 +226,7 @@ impl UdpSocket { io_slices: Vec, socket_addr: SocketAddr, msg_control: Option, - ) -> ( - io::Result, - Vec, - Option, - ) { + ) -> (io::Result, Vec, Option) { self.inner .sendmsg_zc(io_slices, socket_addr, msg_control) .await From 14ab94aac989fda2209cc5c8331e9cab21655173 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 09:03:05 +0000 Subject: [PATCH 18/29] Update --- Cargo.toml | 2 +- src/io/sendmsg_zc.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5e2e246..2b07ffd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,4 +50,4 @@ path = "benches/criterion/no_op.rs" harness = false [patch.crates-io] -io-uring = { git = 'https://github.com/tokio-rs/io-uring', branch = 'master' } \ No newline at end of file +io-uring = { git = 'https://github.com/tokio-rs/io-uring', branch = 'master' } diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 19bf86c1..4a0dca6f 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -31,7 +31,7 @@ impl Op, MultiCQEFuture> { let socket_addr = Box::new(SockAddr::from(socket_addr)); - let mut msghdr: libc::msghdr; + let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() }; msghdr.msg_iov = io_bufs.as_ptr() as *mut _; msghdr.msg_iovlen = io_bufs.len() as _; @@ -39,7 +39,7 @@ impl Op, MultiCQEFuture> { msghdr.msg_namelen = socket_addr.len(); match msg_control { - Some(_msg_control) => { + Some(ref _msg_control) => { msghdr.msg_control = _msg_control.stable_ptr() as *mut _; msghdr.msg_controllen = _msg_control.bytes_init(); } From 9c50c8bfb2a2aaf6b02743d3ad8a42e459b09cc6 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 09:05:13 +0000 Subject: [PATCH 19/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 4a0dca6f..501eb302 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -45,7 +45,7 @@ impl Op, MultiCQEFuture> { } None => { msghdr.msg_control = std::ptr::null_mut(); - msghdr.msg_controllen = 0 as usize; + msghdr.msg_controllen = 0_usize; } } From aea1040d3b40c224c642e0568340c60c13f7500f Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 12:52:27 +0000 Subject: [PATCH 20/29] Update --- src/io/sendmsg_zc.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 501eb302..f44001e4 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -4,6 +4,7 @@ use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Upd use crate::runtime::CONTEXT; use socket2::SockAddr; use std::io; +use std::io::IoSlice; use std::net::SocketAddr; pub(crate) struct SendMsgZc { @@ -33,8 +34,16 @@ impl Op, MultiCQEFuture> { let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() }; - msghdr.msg_iov = io_bufs.as_ptr() as *mut _; - msghdr.msg_iovlen = io_bufs.len() as _; + let io_slices: Vec = Vec::new(); + + for io_buf in io_bufs { + io_slices.push(IoSlice::new(unsafe { + std::slice::from_raw_parts(io_buf.stable_ptr(), io_buf.bytes_init()) + })) + } + + msghdr.msg_iov = io_slices.as_ptr() as *mut _; + msghdr.msg_iovlen = io_slices.len() as _; msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; msghdr.msg_namelen = socket_addr.len(); From c95cef5590f43e55664e3d562da2901817f4a1ff Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 12:55:24 +0000 Subject: [PATCH 21/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index f44001e4..29c104db 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -34,7 +34,7 @@ impl Op, MultiCQEFuture> { let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() }; - let io_slices: Vec = Vec::new(); + let mut io_slices: Vec = Vec::new(); for io_buf in io_bufs { io_slices.push(IoSlice::new(unsafe { From 4058a1fb271812649a5b2ecae3a058f76bcc50ff Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 12:57:54 +0000 Subject: [PATCH 22/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 29c104db..f1f45c47 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -36,7 +36,7 @@ impl Op, MultiCQEFuture> { let mut io_slices: Vec = Vec::new(); - for io_buf in io_bufs { + for io_buf in &io_bufs { io_slices.push(IoSlice::new(unsafe { std::slice::from_raw_parts(io_buf.stable_ptr(), io_buf.bytes_init()) })) From 80e3abfc47106d790e5759e570a5614c47425e69 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 13:51:45 +0000 Subject: [PATCH 23/29] Give msg_control a seperate type parameter. --- src/io/sendmsg_zc.rs | 16 ++++++++-------- src/io/socket.rs | 6 +++--- src/net/udp.rs | 6 +++--- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index f1f45c47..7795dafe 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -7,26 +7,26 @@ use std::io; use std::io::IoSlice; use std::net::SocketAddr; -pub(crate) struct SendMsgZc { +pub(crate) struct SendMsgZc { #[allow(dead_code)] fd: SharedFd, #[allow(dead_code)] io_bufs: Vec, #[allow(dead_code)] socket_addr: Box, - msg_control: Option, + msg_control: Option, msghdr: libc::msghdr, /// Hold the number of transmitted bytes bytes: usize, } -impl Op, MultiCQEFuture> { +impl Op, MultiCQEFuture> { pub(crate) fn sendmsg_zc( fd: &SharedFd, io_bufs: Vec, socket_addr: SocketAddr, - msg_control: Option, + msg_control: Option, ) -> io::Result { use io_uring::{opcode, types}; @@ -80,10 +80,10 @@ impl Op, MultiCQEFuture> { } } -impl Completable for SendMsgZc { - type Output = (io::Result, Vec, Option); +impl Completable for SendMsgZc { + type Output = (io::Result, Vec, Option); - fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { + fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); @@ -97,7 +97,7 @@ impl Completable for SendMsgZc { } } -impl Updateable for SendMsgZc { +impl Updateable for SendMsgZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more self.bytes += *cqe.result.as_ref().unwrap() as usize; diff --git a/src/io/socket.rs b/src/io/socket.rs index 6f914948..ce192e87 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -147,12 +147,12 @@ impl Socket { op.await } - pub(crate) async fn sendmsg_zc( + pub(crate) async fn sendmsg_zc( &self, io_slices: Vec, socket_addr: SocketAddr, - msg_control: Option, - ) -> (io::Result, Vec, Option) { + msg_control: Option, + ) -> (io::Result, Vec, Option) { let op = Op::sendmsg_zc(&self.fd, io_slices, socket_addr, msg_control).unwrap(); op.await } diff --git a/src/net/udp.rs b/src/net/udp.rs index 804584d6..816ea512 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -221,12 +221,12 @@ impl UdpSocket { } /// Sends a message on the socket using a msghdr. - pub async fn sendmsg_zc( + pub async fn sendmsg_zc( &self, io_slices: Vec, socket_addr: SocketAddr, - msg_control: Option, - ) -> (io::Result, Vec, Option) { + msg_control: Option, + ) -> (io::Result, Vec, Option) { self.inner .sendmsg_zc(io_slices, socket_addr, msg_control) .await From 2d3372ceba6d3318e544d04581e76fb2395aedd6 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 18 Jan 2023 13:55:26 +0000 Subject: [PATCH 24/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 7795dafe..a8537fa9 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -34,7 +34,7 @@ impl Op, MultiCQEFuture> { let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() }; - let mut io_slices: Vec = Vec::new(); + let mut io_slices: Vec = Vec::with_capacity(io_bufs.len()); for io_buf in &io_bufs { io_slices.push(IoSlice::new(unsafe { From dc300ea0860ef97bde2dcec45cf1201e34789576 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Fri, 20 Jan 2023 09:15:28 +0000 Subject: [PATCH 25/29] Update io-uring dependency version. --- Cargo.toml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2b07ffd7..b86f4f19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ keywords = ["async", "fs", "io-uring"] tokio = { version = "1.2", features = ["net", "rt"] } slab = "0.4.2" libc = "0.2.80" -io-uring = { version = "0.5.9", features = ["unstable"] } +io-uring = { version = "0.5.12", features = ["unstable"] } socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } @@ -48,6 +48,3 @@ harness = false name = "criterion_no_op" path = "benches/criterion/no_op.rs" harness = false - -[patch.crates-io] -io-uring = { git = 'https://github.com/tokio-rs/io-uring', branch = 'master' } From dce16f074bc42bbbc2d6d55c274419bda26963d9 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Fri, 20 Jan 2023 09:15:51 +0000 Subject: [PATCH 26/29] Add self.bytes to completion result. --- src/io/sendmsg_zc.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index a8537fa9..6aa3dfa4 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -87,6 +87,12 @@ impl Completable for SendMsgZc { // Convert the operation result to `usize` let res = cqe.result.map(|v| v as usize); + // Add the number of bytes to res. + match res { + Ok(_res) => res = Ok(_res + self.bytes), + _ => (), + } + // Recover the data buffers. let io_bufs = self.io_bufs; From af871063b94bd4ed00cf27a0496e0c1b594964e8 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Fri, 20 Jan 2023 09:20:17 +0000 Subject: [PATCH 27/29] Update --- src/io/sendmsg_zc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 6aa3dfa4..c3c04168 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -85,7 +85,7 @@ impl Completable for SendMsgZc { fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); + let mut res = cqe.result.map(|v| v as usize); // Add the number of bytes to res. match res { From 13310e151cb59d13cda82b9ffb8ea52a2f4ededa Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Fri, 20 Jan 2023 09:25:33 +0000 Subject: [PATCH 28/29] Update --- src/io/sendmsg_zc.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index c3c04168..2400ebe1 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -88,9 +88,8 @@ impl Completable for SendMsgZc { let mut res = cqe.result.map(|v| v as usize); // Add the number of bytes to res. - match res { - Ok(_res) => res = Ok(_res + self.bytes), - _ => (), + if let Ok(_res) = res { + res = Ok(_res + self.bytes) } // Recover the data buffers. From b99887a7056d1319def6653307db56614dd53ec0 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Fri, 20 Jan 2023 10:19:06 +0000 Subject: [PATCH 29/29] Update --- src/io/sendmsg_zc.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 2400ebe1..c5b94a4b 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -84,13 +84,8 @@ impl Completable for SendMsgZc { type Output = (io::Result, Vec, Option); fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { - // Convert the operation result to `usize` - let mut res = cqe.result.map(|v| v as usize); - - // Add the number of bytes to res. - if let Ok(_res) = res { - res = Ok(_res + self.bytes) - } + // Convert the operation result to `usize`, and add previous byte count + let res = cqe.result.map(|v| self.bytes + v as usize); // Recover the data buffers. let io_bufs = self.io_bufs;