Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uring cmd #2

Draft
wants to merge 29 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
94e49a2
Add sqe128, cqe32, and uring-cmd features
Jan 13, 2023
e9d8810
Merge branch 'master' into entry128
thomasbarrett Jan 28, 2023
43fc143
Remove uring-cmd feature
Feb 5, 2023
a4b2576
Merge branch 'entry128' of github.com:thomasbarrett/tokio-uring into …
Feb 5, 2023
6125abb
Only run uring_cmd16 test if opcode::UringCmd16 is supported on test …
Feb 5, 2023
a6b3a8d
Merge branch 'tokio-rs:master' into entry128
thomasbarrett Feb 5, 2023
805a265
Fix formatting in tests
Feb 6, 2023
52b29de
Box libc::msghdr in SendMesgZc
ollie-etl Mar 20, 2023
3b0d245
Merge branch 'master' into bug-unstable-msghdr
ollie-etl Mar 30, 2023
0d24217
Remove check on shrinking in set_init
ollie-etl Jul 4, 2023
d425b17
Merge remote-tracking branch 'origin/bug-unstable-msghdr' into bug-un…
ollie-etl Jul 4, 2023
e65d8fe
Merge branch 'tokio-rs:master' into bug-unstable-msghdr
ollie-etl Jul 4, 2023
dc8990d
Merge remote-tracking branch 'origin/bug-unstable-msghdr' into bug-un…
ollie-etl Jul 4, 2023
948772a
Make tolerent to unexpected error
ollie-etl Jul 7, 2023
0e99bb0
Trace flags
ollie-etl Jul 7, 2023
5799f1a
Add logging
ollie-etl Jul 7, 2023
7efa8cc
Better debug
ollie-etl Jul 7, 2023
b1ad8ea
Remove trace
ollie-etl Jul 7, 2023
36f124b
Add builder support for setting worker queues
ollie-etl Jul 13, 2023
c5eaa84
Update io-uring dep
ollie-etl Jul 13, 2023
7abf120
Merge remote-tracking branch 'io_uring_pr/entry128' into uring_cmd
MA-ETL Aug 8, 2023
ce6f758
Merge remote-tracking branch 'etl/etl-dev' into uring_cmd_dev
MA-ETL Aug 8, 2023
e5c17c4
Update for io-uring 0.6.0 crate.
MA-ETL Aug 8, 2023
8fc2f7b
Fix submit_op_2
MA-ETL Aug 8, 2023
6c9a473
Update
MA-ETL Aug 8, 2023
493a9a4
Update
MA-ETL Aug 9, 2023
15b325b
Update
MA-ETL Aug 9, 2023
243096d
Update
MA-ETL Aug 10, 2023
600bf97
Mark api as unsafe, and provide a metadata store
ollie-etl Aug 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,22 @@ keywords = ["async", "fs", "io-uring"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
# Enable 128-byte submission queue entries. This requires Linux >= 5.19.
sqe128 = []
# Enable 32-byte completion queue entries. This requires Linux >= 5.19.
cqe32 = []

[dependencies]
tokio = { version = "1.2", features = ["net", "rt", "sync"] }
slab = "0.4.2"
libc = "0.2.80"
io-uring = "0.5.13"
io-uring = "0.6.0"
socket2 = { version = "0.4.4", features = ["all"] }
bytes = { version = "1.0", optional = true }
futures-util = { version = "0.3.26", default-features = false, features = ["std"] }
futures-util = { version = "0.3.26", default-features = false, features = [
"std",
] }

[dev-dependencies]
tempfile = "3.2.0"
Expand Down
4 changes: 1 addition & 3 deletions src/buf/fixed/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ unsafe impl IoBufMut for FixedBuf {
}

unsafe fn set_init(&mut self, pos: usize) {
if self.buf.init_len < pos {
self.buf.init_len = pos
}
self.buf.init_len = pos
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/buf/io_buf_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ unsafe impl IoBufMut for Vec<u8> {
}

unsafe fn set_init(&mut self, init_len: usize) {
if self.len() < init_len {
self.set_len(init_len);
}
self.set_len(init_len);
}
}

Expand All @@ -53,8 +51,6 @@ unsafe impl IoBufMut for bytes::BytesMut {
}

unsafe fn set_init(&mut self, init_len: usize) {
if self.len() < init_len {
self.set_len(init_len);
}
self.set_len(init_len);
}
}
13 changes: 13 additions & 0 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,19 @@ impl File {
op.await
}

/// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl(2).
pub async fn uring_cmd16(&self, cmd_op: u32, cmd: [u8; 16]) -> io::Result<u32> {
let op = Op::uring_cmd16(&self.fd, cmd_op, cmd).unwrap();
op.await
}

#[cfg(feature = "sqe128")]
/// A file/device-specific 80-byte command, akin (but not equivalent) to ioctl(2).
pub async fn uring_cmd80(&self, cmd_op: u32, cmd: [u8; 80]) -> io::Result<u32> {
let op = Op::uring_cmd80(&self.fd, cmd_op, cmd).unwrap();
op.await
}

/// Read some bytes at the specified offset from the file into the specified
/// array of buffers, returning how many bytes were read.
///
Expand Down
4 changes: 2 additions & 2 deletions src/io/fallocate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ impl Op<Fallocate> {
x.handle().expect("not in a runtime context").submit_op(
Fallocate { fd: fd.clone() },
|fallocate| {
opcode::Fallocate64::new(types::Fd(fallocate.fd.raw_fd()), len as _)
.offset64(offset as _)
opcode::Fallocate::new(types::Fd(fallocate.fd.raw_fd()), len as _)
.offset(offset as _)
.mode(flags)
.build()
},
Expand Down
2 changes: 2 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ mod writev;

mod writev_all;
pub(crate) use writev_all::writev_at_all;

mod uring_cmd;
2 changes: 1 addition & 1 deletion src/io/send_zc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ impl<T> Completable for SendZc<T> {
impl<T> Updateable for SendZc<T> {
fn update(&mut self, cqe: CqeResult) {
// uring send_zc promises there will be no error on CQE's marked more
self.bytes += *cqe.result.as_ref().unwrap() as usize;
self.bytes += cqe.result.unwrap_or_default() as usize;
}
}
2 changes: 1 addition & 1 deletion src/io/sendmsg_zc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ impl<T, U> Completable for SendMsgZc<T, U> {
impl<T, U> Updateable for SendMsgZc<T, U> {
fn update(&mut self, cqe: CqeResult) {
// uring send_zc promises there will be no error on CQE's marked more
self.bytes += *cqe.result.as_ref().unwrap() as usize;
self.bytes += cqe.result.unwrap_or_default() as usize;
}
}
81 changes: 81 additions & 0 deletions src/io/uring_cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use crate::io::SharedFd;
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::io;

pub(crate) struct UringCmd16 {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
#[allow(dead_code)]
fd: SharedFd,
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub(crate) struct UringCmd16 {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
#[allow(dead_code)]
fd: SharedFd,
}
pub(crate) struct UringCmd16<T)> {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
#[allow(dead_code)]
fd: SharedFd,
// Holds side channel data for the duration of the operation
// This allows the construction of safe apis
t: T
}


impl Op<UringCmd16> {
/// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl
pub(crate) fn uring_cmd16(
fd: &SharedFd,
cmd_op: u32,
cmd: [u8; 16],
) -> io::Result<Op<UringCmd16>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
UringCmd16 { fd: fd.clone() },
|_| {
opcode::UringCmd16::new(types::Fd(fd.raw_fd()), cmd_op)
.cmd(cmd)
.build()
},
)
})
}
}

impl Completable for UringCmd16 {
type Output = io::Result<u32>;

fn complete(self, cqe: CqeResult) -> Self::Output {
cqe.result
}
}

#[cfg(feature = "sqe128")]
pub(crate) struct UringCmd80 {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
#[allow(dead_code)]
fd: SharedFd,
}

#[cfg(feature = "sqe128")]
impl Op<UringCmd80> {
/// A file/device-specific 80-byte command, akin (but not equivalent) to ioctl
pub(crate) fn uring_cmd80(
fd: &SharedFd,
cmd_op: u32,
cmd: [u8; 80],
) -> io::Result<Op<UringCmd80>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
UringCmd80 { fd: fd.clone() },
|_| {
opcode::UringCmd80::new(types::Fd(fd.raw_fd()), cmd_op)
.cmd(cmd)
.build()
},
)
})
}
}

#[cfg(feature = "sqe128")]
impl Completable for UringCmd80 {
type Output = io::Result<u32>;

fn complete(self, cqe: CqeResult) -> Self::Output {
cqe.result
}
}
2 changes: 1 addition & 1 deletion src/io/writev_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<T: BoundedBuf> Op<WritevAll<T>> {
// So this wouldn't need to be a function. Just pass in the entry.
|write| {
opcode::Writev::new(types::Fd(write.fd.raw_fd()), iovs_ptr, iovs_len)
.offset64(offset as _)
.offset(offset as _)
.build()
},
)
Expand Down
23 changes: 19 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub use runtime::spawn;
pub use runtime::Runtime;

use crate::runtime::driver::op::Op;
use crate::runtime::driver::{CEntry, SEntry};
use std::future::Future;

/// Starts an `io_uring` enabled Tokio runtime.
Expand Down Expand Up @@ -154,7 +155,7 @@ pub fn start<F: Future>(future: F) -> F::Output {
/// This function is provided to avoid requiring the user of this crate from
/// having to use the io_uring crate as well. Refer to Builder::start example
/// for its intended usage.
pub fn uring_builder() -> io_uring::Builder {
pub fn uring_builder() -> io_uring::Builder<SEntry, CEntry> {
io_uring::IoUring::builder()
}

Expand All @@ -163,7 +164,8 @@ pub fn uring_builder() -> io_uring::Builder {
// #[derive(Clone, Default)]
pub struct Builder {
entries: u32,
urb: io_uring::Builder,
max_workers: [u32; 2],
urb: io_uring::Builder<SEntry, CEntry>,
}

/// Constructs a [`Builder`] with default settings.
Expand All @@ -175,6 +177,7 @@ pub struct Builder {
pub fn builder() -> Builder {
Builder {
entries: 256,
max_workers: [0; 2],
urb: io_uring::IoUring::builder(),
}
}
Expand All @@ -191,11 +194,23 @@ impl Builder {
self
}

/// Get and/or set the limit for number of io_uring worker threads per NUMA
/// node. `bounded` holds the limit for bounded workers, which process I/O
/// operations expected to be bound in time, that is I/O on regular files or
/// block devices. While `unbounded` holds the limit for unbounded workers,
/// which carry out I/O operations that can never complete, for instance I/O
/// on sockets. Setting `None` leaves the default
pub fn max_workers(&mut self, bounded: Option<u32>, unbounded: Option<u32>) -> &mut Self {
self.max_workers = [bounded.unwrap_or_default(), unbounded.unwrap_or_default()];
self
}

/// Replaces the default [`io_uring::Builder`], which controls the settings for the
/// inner `io_uring` API.
///
/// Refer to the [`io_uring::Builder`] documentation for all the supported methods.
pub fn uring_builder(&mut self, b: &io_uring::Builder) -> &mut Self {
/// Refer to the Builder start method for an example.
/// Refer to the io_uring::builder documentation for all the supported methods.
pub fn uring_builder(&mut self, b: &io_uring::Builder<SEntry, CEntry>) -> &mut Self {
self.urb = b.clone();
self
}
Expand Down
7 changes: 4 additions & 3 deletions src/runtime/driver/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::task::{Context, Poll};

use crate::buf::fixed::FixedBuffers;
use crate::runtime::driver::op::{Completable, MultiCQEFuture, Op, Updateable};
use crate::runtime::driver::Driver;
use crate::runtime::driver::{Driver, SEntry};

#[derive(Clone)]
pub(crate) struct Handle {
Expand Down Expand Up @@ -67,10 +67,11 @@ impl Handle {
self.inner.borrow_mut().submit_op_2(sqe)
}

pub(crate) fn submit_op<T, S, F>(&self, data: T, f: F) -> io::Result<Op<T, S>>
pub(crate) fn submit_op<T, S, F, A>(&self, data: T, f: F) -> io::Result<Op<T, S>>
where
T: Completable,
F: FnOnce(&mut T) -> squeue::Entry,
A: Into<SEntry>,
F: FnOnce(&mut T) -> A,
{
self.inner.borrow_mut().submit_op(data, f, self.into())
}
Expand Down
Loading
Loading