From 94e49a2ff37981e8127b43044f87daa5614fe0a0 Mon Sep 17 00:00:00 2001 From: Thomas Barrett Date: Thu, 12 Jan 2023 19:04:53 -0800 Subject: [PATCH 01/20] Add sqe128, cqe32, and uring-cmd features --- Cargo.toml | 8 ++++ src/fs/file.rs | 14 +++++++ src/io/mod.rs | 3 ++ src/io/uring_cmd.rs | 81 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 11 ++--- src/runtime/driver/handle.rs | 8 ++-- src/runtime/driver/mod.rs | 30 ++++++++++--- src/runtime/driver/op/mod.rs | 13 ++++++ tests/fs_file.rs | 22 ++++++++++ 9 files changed, 176 insertions(+), 14 deletions(-) create mode 100644 src/io/uring_cmd.rs diff --git a/Cargo.toml b/Cargo.toml index be1976e5..c3ff1534 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,14 @@ keywords = ["async", "fs", "io-uring"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +# Enable 128-byte submission queue entries. This requires Linux >= 5.19. +sqe128 = [] +# Enable 32-byte completion queue entries. This requires Linux >= 5.19. +cqe32 = [] +# Enable support for uring-cmd. This requires Linux >= 5.19. +uring-cmd = [] + [dependencies] tokio = { version = "1.2", features = ["net", "rt"] } slab = "0.4.2" diff --git a/src/fs/file.rs b/src/fs/file.rs index 5b6265f5..d0db1170 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -181,6 +181,20 @@ impl File { op.await } + #[cfg(all(feature = "uring-cmd"))] + /// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl(2). + pub async fn uring_cmd16(&self, cmd_op: u32, cmd: [u8; 16]) -> io::Result { + let op = Op::uring_cmd16(&self.fd, cmd_op, cmd).unwrap(); + op.await + } + + #[cfg(all(feature = "uring-cmd", feature = "sqe128"))] + /// A file/device-specific 80-byte command, akin (but not equivalent) to ioctl(2). + pub async fn uring_cmd80(&self, cmd_op: u32, cmd: [u8; 80]) -> io::Result { + let op = Op::uring_cmd80(&self.fd, cmd_op, cmd).unwrap(); + op.await + } + /// Read some bytes at the specified offset from the file into the specified /// array of buffers, returning how many bytes were read. /// diff --git a/src/io/mod.rs b/src/io/mod.rs index c92687ed..fe5439da 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -43,3 +43,6 @@ mod write; mod write_fixed; mod writev; + +#[cfg(feature = "uring-cmd")] +mod uring_cmd; diff --git a/src/io/uring_cmd.rs b/src/io/uring_cmd.rs new file mode 100644 index 00000000..6e69dff1 --- /dev/null +++ b/src/io/uring_cmd.rs @@ -0,0 +1,81 @@ +use crate::io::SharedFd; +use crate::runtime::driver::op::{Completable, CqeResult, Op}; +use crate::runtime::CONTEXT; +use std::io; + +pub(crate) struct UringCmd16 { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + #[allow(dead_code)] + fd: SharedFd, +} + +impl Op { + /// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl + pub(crate) fn uring_cmd16( + fd: &SharedFd, + cmd_op: u32, + cmd: [u8; 16], + ) -> io::Result> { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + UringCmd16 { fd: fd.clone() }, + |_| { + opcode::UringCmd16::new(types::Fd(fd.raw_fd()), cmd_op) + .cmd(cmd) + .build() + }, + ) + }) + } +} + +impl Completable for UringCmd16 { + type Output = io::Result; + + fn complete(self, cqe: CqeResult) -> Self::Output { + cqe.result + } +} + +#[cfg(feature = "sqe128")] +pub(crate) struct UringCmd80 { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + #[allow(dead_code)] + fd: SharedFd, +} + +#[cfg(feature = "sqe128")] +impl Op { + /// A file/device-specific 80-byte command, akin (but not equivalent) to ioctl + pub(crate) fn uring_cmd80( + fd: &SharedFd, + cmd_op: u32, + cmd: [u8; 80], + ) -> io::Result> { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + UringCmd80 { fd: fd.clone() }, + |_| { + opcode::UringCmd80::new(types::Fd(fd.raw_fd()), cmd_op) + .cmd(cmd) + .build() + }, + ) + }) + } +} + +#[cfg(feature = "sqe128")] +impl Completable for UringCmd80 { + type Output = io::Result; + + fn complete(self, cqe: CqeResult) -> Self::Output { + cqe.result + } +} diff --git a/src/lib.rs b/src/lib.rs index e775e5c4..50ea185a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,7 @@ pub use runtime::spawn; pub use runtime::Runtime; use crate::runtime::driver::op::Op; +use crate::runtime::driver::{CEntry, SEntry}; use std::future::Future; /// Start an `io_uring` enabled Tokio runtime. @@ -152,8 +153,8 @@ pub fn start(future: F) -> F::Output { /// This function is provided to avoid requiring the user of this crate from /// having to use the io_uring crate as well. Refer to Builder::start example /// for its intended usage. -pub fn uring_builder() -> io_uring::Builder { - io_uring::IoUring::builder() +pub fn uring_builder() -> io_uring::Builder { + io_uring::IoUring::generic_builder() } /// Builder API to allow starting the runtime and creating the io_uring driver with non-default @@ -161,7 +162,7 @@ pub fn uring_builder() -> io_uring::Builder { // #[derive(Clone, Default)] pub struct Builder { entries: u32, - urb: io_uring::Builder, + urb: io_uring::Builder, } /// Return a Builder to allow setting parameters before calling the start method. @@ -171,7 +172,7 @@ pub struct Builder { pub fn builder() -> Builder { Builder { entries: 256, - urb: io_uring::IoUring::builder(), + urb: io_uring::IoUring::generic_builder(), } } @@ -194,7 +195,7 @@ impl Builder { /// /// Refer to the Builder start method for an example. /// Refer to the io_uring::builder documentation for all the supported methods. - pub fn uring_builder(&mut self, b: &io_uring::Builder) -> &mut Self { + pub fn uring_builder(&mut self, b: &io_uring::Builder) -> &mut Self { self.urb = b.clone(); self } diff --git a/src/runtime/driver/handle.rs b/src/runtime/driver/handle.rs index ab3dcc51..7653a545 100644 --- a/src/runtime/driver/handle.rs +++ b/src/runtime/driver/handle.rs @@ -12,7 +12,6 @@ //! The weak handle should be used by anything which is stored in the driver or does not need to //! keep the driver alive for it's duration. -use io_uring::squeue; use std::cell::RefCell; use std::io; use std::ops::Deref; @@ -22,7 +21,7 @@ use std::task::{Context, Poll}; use crate::buf::fixed::FixedBuffers; use crate::runtime::driver::op::{Completable, MultiCQEFuture, Op, Updateable}; -use crate::runtime::driver::Driver; +use crate::runtime::driver::{Driver, SEntry}; #[derive(Clone)] pub(crate) struct Handle { @@ -63,10 +62,11 @@ impl Handle { self.inner.borrow_mut().unregister_buffers(buffers) } - pub(crate) fn submit_op(&self, data: T, f: F) -> io::Result> + pub(crate) fn submit_op(&self, data: T, f: F) -> io::Result> where T: Completable, - F: FnOnce(&mut T) -> squeue::Entry, + A: Into, + F: FnOnce(&mut T) -> A, { self.inner.borrow_mut().submit_op(data, f, self.into()) } diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index ab80624b..8f23f669 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -11,6 +11,16 @@ use std::task::{Context, Poll}; pub(crate) use handle::*; +#[cfg(not(feature = "sqe128"))] +pub(crate) type SEntry = squeue::Entry; +#[cfg(feature = "sqe128")] +pub(crate) type SEntry = squeue::Entry128; + +#[cfg(not(feature = "cqe32"))] +pub(crate) type CEntry = cqueue::Entry; +#[cfg(feature = "cqe32")] +pub(crate) type CEntry = cqueue::Entry32; + mod handle; pub(crate) mod op; @@ -19,7 +29,7 @@ pub(crate) struct Driver { ops: Ops, /// IoUring bindings - uring: IoUring, + uring: IoUring, /// Reference to the currently registered buffers. /// Ensures that the buffers are not dropped until @@ -122,7 +132,7 @@ impl Driver { )) } - pub(crate) fn submit_op( + pub(crate) fn submit_op( &mut self, mut data: T, f: F, @@ -130,12 +140,14 @@ impl Driver { ) -> io::Result> where T: Completable, - F: FnOnce(&mut T) -> squeue::Entry, + A: Into, + F: FnOnce(&mut T) -> A, { let index = self.ops.insert(); // Configure the SQE - let sqe = f(&mut data).user_data(index as _); + let sqe: SEntry = f(&mut data).into(); + let sqe = sqe.user_data(index as _); // Create the operation let op = Op::new(handle, data, index); @@ -343,7 +355,15 @@ impl Drop for Driver { while self .uring .submission() - .push(&AsyncCancel::new(id as u64).build().user_data(u64::MAX)) + .push( + // Conversion to configured squeue::EntryMarker is useless when + // sqe128 feature is disabled. + #[allow(clippy::useless_conversion)] + &AsyncCancel::new(id as u64) + .build() + .user_data(u64::MAX) + .into(), + ) .is_err() { self.uring diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 5758a29d..7885c940 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -90,6 +90,19 @@ impl From for CqeResult { } } +impl From for CqeResult { + fn from(cqe: cqueue::Entry32) -> Self { + let res = cqe.result(); + let flags = cqe.flags(); + let result = if res >= 0 { + Ok(res as u32) + } else { + Err(io::Error::from_raw_os_error(-res)) + }; + CqeResult { result, flags } + } +} + impl Op { /// Create a new operation pub(super) fn new(driver: driver::WeakHandle, data: T, index: usize) -> Self { diff --git a/tests/fs_file.rs b/tests/fs_file.rs index a5e968d0..c9e6b6f3 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -24,6 +24,28 @@ async fn read_hello(file: &File) { assert_eq!(&buf[..n], HELLO); } +#[cfg(all(feature = "uring-cmd"))] +#[test] +fn uring_cmd16() { + tokio_uring::start(async { + let file = File::open("/dev/null").await.unwrap(); + let res = file.uring_cmd16(0, [0x00; 16]).await.unwrap(); + + assert_eq!(res, 0); + }); +} + +#[cfg(all(feature = "uring-cmd", feature = "sqe128"))] +#[test] +fn uring_cmd80() { + tokio_uring::start(async { + let file = File::open("/dev/null").await.unwrap(); + let res = file.uring_cmd80(0, [0x00; 80]).await.unwrap(); + + assert_eq!(res, 0); + }); +} + #[test] fn basic_read() { tokio_uring::start(async { From 43fc143f2d8710b97759ccf439244d1b3668e328 Mon Sep 17 00:00:00 2001 From: Thomas Barrett Date: Sun, 5 Feb 2023 10:09:01 -0800 Subject: [PATCH 02/20] Remove uring-cmd feature --- Cargo.toml | 2 -- src/fs/file.rs | 3 +-- src/io/mod.rs | 1 - tests/fs_file.rs | 3 +-- 4 files changed, 2 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c3ff1534..72900b69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,6 @@ keywords = ["async", "fs", "io-uring"] sqe128 = [] # Enable 32-byte completion queue entries. This requires Linux >= 5.19. cqe32 = [] -# Enable support for uring-cmd. This requires Linux >= 5.19. -uring-cmd = [] [dependencies] tokio = { version = "1.2", features = ["net", "rt"] } diff --git a/src/fs/file.rs b/src/fs/file.rs index d0db1170..fba404ed 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -181,14 +181,13 @@ impl File { op.await } - #[cfg(all(feature = "uring-cmd"))] /// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl(2). pub async fn uring_cmd16(&self, cmd_op: u32, cmd: [u8; 16]) -> io::Result { let op = Op::uring_cmd16(&self.fd, cmd_op, cmd).unwrap(); op.await } - #[cfg(all(feature = "uring-cmd", feature = "sqe128"))] + #[cfg(feature = "sqe128")] /// A file/device-specific 80-byte command, akin (but not equivalent) to ioctl(2). pub async fn uring_cmd80(&self, cmd_op: u32, cmd: [u8; 80]) -> io::Result { let op = Op::uring_cmd80(&self.fd, cmd_op, cmd).unwrap(); diff --git a/src/io/mod.rs b/src/io/mod.rs index fe5439da..8c793a3e 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -44,5 +44,4 @@ mod write_fixed; mod writev; -#[cfg(feature = "uring-cmd")] mod uring_cmd; diff --git a/tests/fs_file.rs b/tests/fs_file.rs index c9e6b6f3..1b4c0787 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -24,7 +24,6 @@ async fn read_hello(file: &File) { assert_eq!(&buf[..n], HELLO); } -#[cfg(all(feature = "uring-cmd"))] #[test] fn uring_cmd16() { tokio_uring::start(async { @@ -35,7 +34,7 @@ fn uring_cmd16() { }); } -#[cfg(all(feature = "uring-cmd", feature = "sqe128"))] +#[cfg(feature = "sqe128")] #[test] fn uring_cmd80() { tokio_uring::start(async { From 6125abb5e7e69f20f7dae38521ff10befe4b35ef Mon Sep 17 00:00:00 2001 From: Thomas Barrett Date: Sun, 5 Feb 2023 15:19:34 -0800 Subject: [PATCH 03/20] Only run uring_cmd16 test if opcode::UringCmd16 is supported on test runner --- tests/fs_file.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/fs_file.rs b/tests/fs_file.rs index 2d391389..1e31a85f 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -11,6 +11,8 @@ use tokio_uring::buf::fixed::FixedBufRegistry; use tokio_uring::buf::{BoundedBuf, BoundedBufMut}; use tokio_uring::fs::File; +use io_uring::{IoUring, Probe}; + #[path = "../src/future.rs"] #[allow(warnings)] mod future; @@ -28,12 +30,20 @@ async fn read_hello(file: &File) { #[test] fn uring_cmd16() { - tokio_uring::start(async { - let file = File::open("/dev/null").await.unwrap(); - let res = file.uring_cmd16(0, [0x00; 16]).await.unwrap(); - - assert_eq!(res, 0); - }); + // Check that io_uring::opcode::UringCmd16 is supported on the running kernel. + let entries = 8; + let ring = IoUring::new(entries).unwrap(); + let probe = Probe::new(); + ring.submitter().register_probe(&mut probe).unwrap(); + if probe.is_supported(io_uring::opcode::UringCmd16) { + tokio_uring::start(async { + io_uring.submitter().register_probe(&mut probe)?; + let file = File::open("/dev/null").await.unwrap(); + let res = file.uring_cmd16(0, [0x00; 16]).await.unwrap(); + + assert_eq!(res, 0); + }); + } } #[cfg(feature = "sqe128")] From 805a2654d84db74b402f7819f525d26b27da1fbd Mon Sep 17 00:00:00 2001 From: Thomas Barrett Date: Sun, 5 Feb 2023 19:46:22 -0800 Subject: [PATCH 04/20] Fix formatting in tests --- tests/fs_file.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/fs_file.rs b/tests/fs_file.rs index 8387ee89..93bd77f2 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -39,7 +39,6 @@ fn uring_cmd16() { tokio_uring::start(async { let file = File::open("/dev/null").await.unwrap(); let res = file.uring_cmd16(0, [0x00; 16]).await.unwrap(); - assert_eq!(res, 0); }); } From 52b29de6974f3b3283ac7270e2ba053511c5f7b2 Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Mon, 20 Mar 2023 19:52:15 +0000 Subject: [PATCH 05/20] Box libc::msghdr in SendMesgZc This is required to ensure the msghdr is stable over the lifetime of the operation --- src/io/sendmsg_zc.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 4a52429d..e7da9fe5 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -17,7 +17,7 @@ pub(crate) struct SendMsgZc { #[allow(dead_code)] socket_addr: Option>, msg_control: Option, - msghdr: libc::msghdr, + msghdr: Box, /// Hold the number of transmitted bytes bytes: usize, @@ -32,7 +32,7 @@ impl Op, MultiCQEFuture> { ) -> io::Result { use io_uring::{opcode, types}; - let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() }; + let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); let mut io_slices: Vec> = Vec::with_capacity(io_bufs.len()); @@ -84,7 +84,7 @@ impl Op, MultiCQEFuture> { |sendmsg_zc| { opcode::SendMsgZc::new( types::Fd(sendmsg_zc.fd.raw_fd()), - &sendmsg_zc.msghdr as *const _, + sendmsg_zc.msghdr.as_mut() as *const _, ) .build() }, From 0d2421723f0b4946a8da1d0dba98e3cf0ca62327 Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Tue, 4 Jul 2023 13:35:31 +0100 Subject: [PATCH 06/20] Remove check on shrinking in set_init --- src/buf/fixed/handle.rs | 4 +--- src/buf/io_buf_mut.rs | 8 ++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/buf/fixed/handle.rs b/src/buf/fixed/handle.rs index 5e77c125..e8f910d6 100644 --- a/src/buf/fixed/handle.rs +++ b/src/buf/fixed/handle.rs @@ -82,9 +82,7 @@ unsafe impl IoBufMut for FixedBuf { } unsafe fn set_init(&mut self, pos: usize) { - if self.buf.init_len < pos { - self.buf.init_len = pos - } + self.buf.init_len = pos } } diff --git a/src/buf/io_buf_mut.rs b/src/buf/io_buf_mut.rs index ea1e19c8..c3b2acc4 100644 --- a/src/buf/io_buf_mut.rs +++ b/src/buf/io_buf_mut.rs @@ -40,9 +40,7 @@ unsafe impl IoBufMut for Vec { } unsafe fn set_init(&mut self, init_len: usize) { - if self.len() < init_len { - self.set_len(init_len); - } + self.set_len(init_len); } } @@ -53,8 +51,6 @@ unsafe impl IoBufMut for bytes::BytesMut { } unsafe fn set_init(&mut self, init_len: usize) { - if self.len() < init_len { - self.set_len(init_len); - } + self.set_len(init_len); } } From 948772a1751778a83ab570de9e84d99a6c9af884 Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Fri, 7 Jul 2023 10:59:59 +0100 Subject: [PATCH 07/20] Make tolerent to unexpected error --- src/io/send_zc.rs | 2 +- src/io/sendmsg_zc.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index df37722b..43afbf1e 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -53,6 +53,6 @@ impl Completable for SendZc { impl Updateable for SendZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more - self.bytes += *cqe.result.as_ref().unwrap() as usize; + self.bytes += cqe.result.unwrap_or_default() as usize; } } diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index e7da9fe5..719397c2 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -113,6 +113,6 @@ impl Completable for SendMsgZc { impl Updateable for SendMsgZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more - self.bytes += *cqe.result.as_ref().unwrap() as usize; + self.bytes += cqe.result.unwrap_or_default() as usize; } } From 0e99bb00d5619ad414265c918cc216bda8f0e525 Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Fri, 7 Jul 2023 11:35:17 +0100 Subject: [PATCH 08/20] Trace flags --- Cargo.toml | 1 + src/io/send_zc.rs | 4 ++++ src/io/sendmsg_zc.rs | 2 ++ 3 files changed, 7 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 9b707e80..690d48ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ io-uring = "0.5.13" socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } futures-util = { version = "0.3.26", default-features = false, features = ["std"] } +tracing = "*" [dev-dependencies] tempfile = "3.2.0" diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index 43afbf1e..37f7b945 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -42,6 +42,8 @@ impl Completable for SendZc { type Output = BufResult; fn complete(self, cqe: CqeResult) -> Self::Output { + let flags = cqe.flags; + tracing::trace!("Complete: flags {flags}"); // Convert the operation result to `usize` let res = cqe.result.map(|v| self.bytes + v as usize); // Recover the buffer @@ -53,6 +55,8 @@ impl Completable for SendZc { impl Updateable for SendZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more + let flags = cqe.flags; + tracing::trace!("Update: flags {flags}"); self.bytes += cqe.result.unwrap_or_default() as usize; } } diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 719397c2..4a21411f 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -113,6 +113,8 @@ impl Completable for SendMsgZc { impl Updateable for SendMsgZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more + let flags = cqe.flags; + tracing::trace!("Update: flags {flags}"); self.bytes += cqe.result.unwrap_or_default() as usize; } } From 5799f1a4c2cf7b79db328e50511805c8713e9b9e Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Fri, 7 Jul 2023 11:50:51 +0100 Subject: [PATCH 09/20] Add logging --- src/io/sendmsg_zc.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 4a21411f..952501a6 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -97,6 +97,8 @@ impl Completable for SendMsgZc { type Output = (io::Result, Vec, Option); fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { + let flags = cqe.flags; + tracing::trace!("Complete: flags {flags}"); // Convert the operation result to `usize`, and add previous byte count let res = cqe.result.map(|v| self.bytes + v as usize); From 7efa8cce53f73407cd5a0fb107163b6d584cee49 Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Fri, 7 Jul 2023 12:03:45 +0100 Subject: [PATCH 10/20] Better debug --- src/io/sendmsg_zc.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 952501a6..2ad71373 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -98,9 +98,11 @@ impl Completable for SendMsgZc { fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { let flags = cqe.flags; + let result = cqe.result; tracing::trace!("Complete: flags {flags}"); + tracing::trace!("Update: flags {flags}, res {result:?}"); // Convert the operation result to `usize`, and add previous byte count - let res = cqe.result.map(|v| self.bytes + v as usize); + let res = result.map(|v| self.bytes + v as usize); // Recover the data buffers. let io_bufs = self.io_bufs; @@ -116,7 +118,8 @@ impl Updateable for SendMsgZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more let flags = cqe.flags; - tracing::trace!("Update: flags {flags}"); - self.bytes += cqe.result.unwrap_or_default() as usize; + let result = cqe.result; + tracing::trace!("Update: flags {flags}, res {result:?}"); + self.bytes += result.unwrap_or_default() as usize; } } From b1ad8ea8be34a81b3cd3e3bcb032d888b290b5ee Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Fri, 7 Jul 2023 12:53:52 +0100 Subject: [PATCH 11/20] Remove trace --- Cargo.toml | 1 - src/io/send_zc.rs | 4 ---- src/io/sendmsg_zc.rs | 11 ++--------- 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 690d48ca..9b707e80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,6 @@ io-uring = "0.5.13" socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } futures-util = { version = "0.3.26", default-features = false, features = ["std"] } -tracing = "*" [dev-dependencies] tempfile = "3.2.0" diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index 37f7b945..43afbf1e 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -42,8 +42,6 @@ impl Completable for SendZc { type Output = BufResult; fn complete(self, cqe: CqeResult) -> Self::Output { - let flags = cqe.flags; - tracing::trace!("Complete: flags {flags}"); // Convert the operation result to `usize` let res = cqe.result.map(|v| self.bytes + v as usize); // Recover the buffer @@ -55,8 +53,6 @@ impl Completable for SendZc { impl Updateable for SendZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more - let flags = cqe.flags; - tracing::trace!("Update: flags {flags}"); self.bytes += cqe.result.unwrap_or_default() as usize; } } diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index 2ad71373..719397c2 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -97,12 +97,8 @@ impl Completable for SendMsgZc { type Output = (io::Result, Vec, Option); fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { - let flags = cqe.flags; - let result = cqe.result; - tracing::trace!("Complete: flags {flags}"); - tracing::trace!("Update: flags {flags}, res {result:?}"); // Convert the operation result to `usize`, and add previous byte count - let res = result.map(|v| self.bytes + v as usize); + let res = cqe.result.map(|v| self.bytes + v as usize); // Recover the data buffers. let io_bufs = self.io_bufs; @@ -117,9 +113,6 @@ impl Completable for SendMsgZc { impl Updateable for SendMsgZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more - let flags = cqe.flags; - let result = cqe.result; - tracing::trace!("Update: flags {flags}, res {result:?}"); - self.bytes += result.unwrap_or_default() as usize; + self.bytes += cqe.result.unwrap_or_default() as usize; } } From 36f124b7c960b6f8e8471be43b008b66ddaa429d Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Thu, 13 Jul 2023 11:03:17 +0100 Subject: [PATCH 12/20] Add builder support for setting worker queues --- src/lib.rs | 13 +++++++++++++ src/runtime/driver/mod.rs | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index d1cc6e02..eb8f01b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -163,6 +163,7 @@ pub fn uring_builder() -> io_uring::Builder { // #[derive(Clone, Default)] pub struct Builder { entries: u32, + max_workers: [u32; 2], urb: io_uring::Builder, } @@ -175,6 +176,7 @@ pub struct Builder { pub fn builder() -> Builder { Builder { entries: 256, + max_workers: [0; 2], urb: io_uring::IoUring::builder(), } } @@ -191,6 +193,17 @@ impl Builder { self } + /// Get and/or set the limit for number of io_uring worker threads per NUMA + /// node. `bounded` holds the limit for bounded workers, which process I/O + /// operations expected to be bound in time, that is I/O on regular files or + /// block devices. While `unbounded` holds the limit for unbounded workers, + /// which carry out I/O operations that can never complete, for instance I/O + /// on sockets. Setting `None` leaves the default + pub fn max_workers(&mut self, bounded: Option, unbounded: Option) -> &mut Self { + self.max_workers = [bounded.unwrap_or_default(), unbounded.unwrap_or_default()]; + self + } + /// Replaces the default [`io_uring::Builder`], which controls the settings for the /// inner `io_uring` API. /// diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 21d7de0b..3c0915cb 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -40,6 +40,10 @@ impl Driver { pub(crate) fn new(b: &crate::Builder) -> io::Result { let uring = b.urb.build(b.entries)?; + uring + .submitter() + .register_iowq_max_workers(&mut b.max_workers.clone())?; + Ok(Driver { ops: Ops::new(), uring, From c5eaa84188fca6cccf9ea2dba66c5ea9d2548324 Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Thu, 13 Jul 2023 15:46:34 +0100 Subject: [PATCH 13/20] Update io-uring dep --- Cargo.toml | 2 +- src/io/fallocate.rs | 4 ++-- src/io/writev_all.rs | 2 +- src/runtime/driver/mod.rs | 8 +++++--- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9b707e80..8bdec82e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ keywords = ["async", "fs", "io-uring"] tokio = { version = "1.2", features = ["net", "rt", "sync"] } slab = "0.4.2" libc = "0.2.80" -io-uring = "0.5.13" +io-uring = "0.6.0" socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } futures-util = { version = "0.3.26", default-features = false, features = ["std"] } diff --git a/src/io/fallocate.rs b/src/io/fallocate.rs index fa932124..382710d0 100644 --- a/src/io/fallocate.rs +++ b/src/io/fallocate.rs @@ -25,8 +25,8 @@ impl Op { x.handle().expect("not in a runtime context").submit_op( Fallocate { fd: fd.clone() }, |fallocate| { - opcode::Fallocate64::new(types::Fd(fallocate.fd.raw_fd()), len as _) - .offset64(offset as _) + opcode::Fallocate::new(types::Fd(fallocate.fd.raw_fd()), len as _) + .offset(offset as _) .mode(flags) .build() }, diff --git a/src/io/writev_all.rs b/src/io/writev_all.rs index 38ee8a63..ef5b9d40 100644 --- a/src/io/writev_all.rs +++ b/src/io/writev_all.rs @@ -134,7 +134,7 @@ impl Op> { // So this wouldn't need to be a function. Just pass in the entry. |write| { opcode::Writev::new(types::Fd(write.fd.raw_fd()), iovs_ptr, iovs_len) - .offset64(offset as _) + .offset(offset as _) .build() }, ) diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 3c0915cb..357255fb 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -101,9 +101,11 @@ impl Driver { &mut self, buffers: Rc>, ) -> io::Result<()> { - self.uring - .submitter() - .register_buffers(buffers.borrow().iovecs())?; + unsafe { + self.uring + .submitter() + .register_buffers(buffers.borrow().iovecs())?; + } self.fixed_buffers = Some(buffers); Ok(()) From e5c17c4c42ec78528a51561db2d236d6bd720673 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 8 Aug 2023 14:24:14 +0100 Subject: [PATCH 14/20] Update for io-uring 0.6.0 crate. --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index dc0a901b..42b24987 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,7 +156,7 @@ pub fn start(future: F) -> F::Output { /// having to use the io_uring crate as well. Refer to Builder::start example /// for its intended usage. pub fn uring_builder() -> io_uring::Builder { - io_uring::IoUring::generic_builder() + io_uring::IoUring::builder() } /// Builder API that can create and start the `io_uring` runtime with non-default parameters, @@ -178,7 +178,7 @@ pub fn builder() -> Builder { Builder { entries: 256, max_workers: [0; 2], - urb: io_uring::IoUring::generic_builder(), + urb: io_uring::IoUring::builder(), } } From 8fc2f7b8f93ab30e63bebc3f6e9407bb8a21b5bb Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 8 Aug 2023 15:47:39 +0100 Subject: [PATCH 15/20] Fix submit_op_2 --- src/runtime/driver/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index a8c700d4..0a676987 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -138,10 +138,14 @@ impl Driver { )) } - pub(crate) fn submit_op_2(&mut self, sqe: squeue::Entry) -> usize { + pub(crate) fn submit_op_2(&mut self, sqe: A) -> usize + where + A: Into, + { let index = self.ops.insert(); // Configure the SQE + let sqe: SEntry = sqe.into(); let sqe = sqe.user_data(index as _); // Push the new operation From 6c9a473513f7d2c0c400d1ddc379434b94705c76 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Tue, 8 Aug 2023 15:55:51 +0100 Subject: [PATCH 16/20] Update --- src/runtime/driver/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 0a676987..2930f86d 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -514,9 +514,14 @@ impl Ops { self.lifecycle.remove(index); } - fn complete(&mut self, index: usize, cqe: cqueue::Entry) { + fn complete(&mut self, index: usize, cqe: A) + where + A: Into, + { let completions = &mut self.completions; - if self.lifecycle[index].complete(completions, cqe) { + let cqe: CEntry = cqe.into(); + + if self.lifecycle[index].complete(completions, cqe.into()) { self.lifecycle.remove(index); } } From 493a9a43a4454961415af16a9ca0914faaacf285 Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 9 Aug 2023 10:52:24 +0100 Subject: [PATCH 17/20] Update --- src/runtime/driver/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 2930f86d..1451a0b4 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -146,7 +146,7 @@ impl Driver { // Configure the SQE let sqe: SEntry = sqe.into(); - let sqe = sqe.user_data(index as _); + let sqe: SEntry = sqe.user_data(index as _); // Push the new operation while unsafe { self.uring.submission().push(&sqe).is_err() } { From 15b325b77579d504973b111d568b7275afb58fec Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Wed, 9 Aug 2023 10:55:41 +0100 Subject: [PATCH 18/20] Update --- src/runtime/driver/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 1451a0b4..d06d43b1 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -172,7 +172,7 @@ impl Driver { // Configure the SQE let sqe: SEntry = f(&mut data).into(); - let sqe = sqe.user_data(index as _); + let sqe: SEntry = sqe.user_data(index as _); // Create the operation let op = Op::new(handle, data, index); From 243096d16c3aa6f33d84108625275dd5e3cc4eac Mon Sep 17 00:00:00 2001 From: MA-ETL Date: Thu, 10 Aug 2023 11:34:01 +0100 Subject: [PATCH 19/20] Update --- Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 241cbb1b..17385d9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,9 @@ libc = "0.2.80" io-uring = "0.6.0" socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } -futures-util = { version = "0.3.26", default-features = false, features = ["std"] } +futures-util = { version = "0.3.26", default-features = false, features = [ + "std", +] } [dev-dependencies] tempfile = "3.2.0" From 600bf9758ea12314975ce55143a319cdabd417a8 Mon Sep 17 00:00:00 2001 From: ollie-etl <72926894+ollie-etl@users.noreply.github.com> Date: Wed, 16 Aug 2023 11:47:54 +0000 Subject: [PATCH 20/20] Mark api as unsafe, and provide a metadata store --- src/fs/file.rs | 12 ++++++++++-- src/io/uring_cmd.rs | 20 +++++++++++++------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index a638d184..2d92dd5e 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -183,8 +183,16 @@ impl File { } /// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl(2). - pub async fn uring_cmd16(&self, cmd_op: u32, cmd: [u8; 16]) -> io::Result { - let op = Op::uring_cmd16(&self.fd, cmd_op, cmd).unwrap(); + /// + /// Marked as unsafe, as it is the callers responsibility to ensure that any data which is required + /// to be stable is either passed in as the metadata argument, or otherwise stable through the life of the operation + pub async unsafe fn uring_cmd16( + &self, + cmd_op: u32, + cmd: [u8; 16], + metadata: T, + ) -> (io::Result, T) { + let op = Op::uring_cmd16(&self.fd, cmd_op, cmd, metadata).unwrap(); op.await } diff --git a/src/io/uring_cmd.rs b/src/io/uring_cmd.rs index 6e69dff1..904f6fb9 100644 --- a/src/io/uring_cmd.rs +++ b/src/io/uring_cmd.rs @@ -3,25 +3,31 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use std::io; -pub(crate) struct UringCmd16 { +pub(crate) struct UringCmd16 { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. #[allow(dead_code)] fd: SharedFd, + // Holds any required metadata stable for the lifetime of the operation + metadata: T, } -impl Op { +impl Op> { /// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl pub(crate) fn uring_cmd16( fd: &SharedFd, cmd_op: u32, cmd: [u8; 16], - ) -> io::Result> { + metadata: T, + ) -> io::Result>> { use io_uring::{opcode, types}; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( - UringCmd16 { fd: fd.clone() }, + UringCmd16 { + fd: fd.clone(), + metadata, + }, |_| { opcode::UringCmd16::new(types::Fd(fd.raw_fd()), cmd_op) .cmd(cmd) @@ -32,11 +38,11 @@ impl Op { } } -impl Completable for UringCmd16 { - type Output = io::Result; +impl Completable for UringCmd16 { + type Output = (std::result::Result, T); fn complete(self, cqe: CqeResult) -> Self::Output { - cqe.result + (cqe.result, self.metadata) } }