Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op: poll and poll_multi #255

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ keywords = ["async", "fs", "io-uring"]

[dependencies]
tokio = { version = "1.2", features = ["net", "rt", "sync"] }
tokio-stream = "0.1.12"
slab = "0.4.2"
libc = "0.2.80"
io-uring = "0.5.13"
socket2 = { version = "0.4.4", features = ["all"] }
bytes = { version = "1.0", optional = true }
async-stream = "0.3.4"
futures = "0.3.26"
futures-core = "0.3.26"

[dev-dependencies]
tempfile = "3.2.0"
Expand Down
144 changes: 144 additions & 0 deletions src/buf/bufgroup/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//! The io_uring device implements several provided-buffering mechanisms, which are all called
//! buffer groups in the liburing man pages.
//!
//! Buffer groups share a few things in common:
//! o all provide a mechanism to seed the kernel with userland buffers for use in various
//! read operations
//! o all use a u16 Buffer Group ID
//! o all use a u16 Buffer ID
//! o all are specified in the read or receive operations by setting
//! the IOSQE_BUFFER_SELECT bit in the sqe flags field and
//! then identifying the buffer group id in the sqe buf_group field
//! o all read or receive operations that used a buffer group have
//! the IORING_CQE_F_BUFFER bit set in the cqe flags field and
//! the buffer id chosen in the upper 16 bits of the cqe res field
//!
//! As of Oct 2022, the latest buffer group mechanism implemented by the io_uring device, and the
//! one that promises the best performance with least amount of overhead, is the buf_ring. The
//! buf_ring has several liburing man pages, the first to reference should probably be
//! io_uring_buf_ring_init(3).

use crate::buf::bufring::BufRing;

/// The buffer group ID.
///
/// The creater of a buffer group is responsible for picking a buffer group id
/// that does not conflict with other buffer group ids also being registered with the uring
/// interface.
pub(crate) type Bgid = u16;

// Future: Maybe create a bgid module with a trivial implementation of a type that tracks the next
// bgid to use. The crate's driver could do that perhaps, but there could be a benefit to tracking
// them across multiple thread's drivers. So there is flexibility in not building it into the
// driver.

/// The buffer ID. Buffer ids are assigned and used by the crate and probably are not visible
/// to the crate user.
pub(crate) type Bid = u16;

/// This tracks a buffer that has been filled in by the kernel, having gotten the memory
/// from a buffer ring, and returned to userland via a cqe entry.
pub struct BufX {
bgroup: BufRing,
bid: Bid,
len: usize,
}

impl BufX {
// # Safety
//
// The bid must be the buffer id supplied by the kernel as having been chosen and written to.
// The length of the buffer must represent the length written to by the kernel.
pub(crate) unsafe fn new(bgroup: BufRing, bid: Bid, len: usize) -> Self {
// len will already have been checked against the buf_capacity
// so it is guaranteed that len <= bgroup.buf_capacity.

Self { bgroup, bid, len }
}

/// Return the number of bytes initialized.
///
/// This value initially came from the kernel, as reported in the cqe. This value may have been
/// modified with a call to the IoBufMut::set_init method.
#[inline]
pub fn len(&self) -> usize {
self.len
}

/// Return true if this represents an empty buffer. The length reported by the kernel was 0.
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Return the capacity of this buffer.
#[inline]
pub fn cap(&self) -> usize {
self.bgroup.buf_capacity(self.bid)
}

/// Return a byte slice reference.
#[inline]
pub fn as_slice(&self) -> &[u8] {
let p = self.bgroup.stable_ptr(self.bid);
// Safety: the pointer returned by stable_ptr is valid for the lifetime of self,
// and self's len is set when the kernel reports the amount of data that was
// written into the buffer.
unsafe { std::slice::from_raw_parts(p, self.len) }
}

/// Return a mutable byte slice reference.
#[inline]
pub fn as_slice_mut(&mut self) -> &mut [u8] {
let p = self.bgroup.stable_mut_ptr(self.bid);
// Safety: the pointer returned by stable_mut_ptr is valid for the lifetime of self,
// and self's len is set when the kernel reports the amount of data that was
// written into the buffer. In addition, we hold a &mut reference to self.
unsafe { std::slice::from_raw_parts_mut(p, self.len) }
}

// Future: provide access to the uninit space between len and cap if the buffer is being
// repurposed before being dropped. The set_init below does that too.
}

impl Drop for BufX {
fn drop(&mut self) {
// Add the buffer back to the bgroup, for the kernel to reuse.
// Safety: this function may only be called by the buffer's drop function.
unsafe { self.bgroup.dropping_bid(self.bid) };
}
}

unsafe impl crate::buf::IoBuf for BufX {
fn stable_ptr(&self) -> *const u8 {
self.bgroup.stable_ptr(self.bid)
}

fn bytes_init(&self) -> usize {
self.len
}

fn bytes_total(&self) -> usize {
self.cap()
}
}

unsafe impl crate::buf::IoBufMut for BufX {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.bgroup.stable_mut_ptr(self.bid)
}

unsafe fn set_init(&mut self, init_len: usize) {
if self.len < init_len {
let cap = self.bgroup.buf_capacity(self.bid);
assert!(init_len <= cap);
self.len = init_len;
}
}
}

impl From<BufX> for Vec<u8> {
fn from(item: BufX) -> Self {
item.as_slice().to_vec()
}
}
17 changes: 17 additions & 0 deletions src/buf/bufring/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! A buf_ring pool of buffers registered with the kernel.
//!
//! This module provides the [`BufRing`] and [`Builder`] to allow
//! using the `buf_ring` feature of the kernel's `io_uring` device.
//!
//! The [`BufRing`] is this library's only implementation of the device's more general `Provided
//! Buffers` feature where some device operations can work with buffers that had been provided to
//! the device at an earlier point, rather than as part of the operation itself.
//!
//! Operations like [`crate::net::TcpStream::recv_provbuf`] make use of the `buf_ring`. This
//! operation does not take a buffer as input, but does return a buffer when successful. Once the
//! buffer is dropped, it is returned to the `buf_ring`.

pub(crate) mod ring;

pub use ring::BufRing;
pub use ring::Builder;
Loading