From d2b3418ff553b75da397e8534ef84f784b8d7682 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Mon, 20 Feb 2023 19:31:27 -0500 Subject: [PATCH 01/16] BufRing: Provided Buffer Pool Adds new types: bufring::{BufRing, Builder}, bufgroup:{BufX, Bgid, Bid} (BufX was chosen as the name of the buffer type of this pool simply to make it short and easy to grep for.) bufgroup may be better named provbuf, or provided_buffers, down the road, especially if we intent to support the other, older, much less efficient provided buffer mechanism. But the liburing team specifically recommends the buf_ring implementation for ease of use and performance reasons so we may not want to incur the overhead of supporting both forms. bufring could become a trait down the road, where different BufRing implementations can be provided. But to avoid the extra decision making that comes with designing traits, and to keep the code relatively easy to follow, these are all concrate types. Adds register and unregister functionality to BufRing and to the tokio_uring driver. Adds experimental recv, recv_provbuf methods to TcpStream. recv_provbuf is a purposefully ugly name. It will be replaced by something ... maybe a recv builder sometimme soon. Whatever is chosen would then be copied for Udp, Unix and for all the other operations that optionally take a provided buffer pool id. Adds 'net' unit tests. All are admittedly simple ping/pong tests where only the clients' received lengths are checked, not the actual data. Adds a tests/common area. Adds a test case that uses two std::threads, where each thread runs its own tokio_uring runtime and its own buf_ring provided buffer pool. The two-thread case is made long, with many clients, sending many large messages to the server and getting them back, in order to see gross performance impacts when changing things. It takes 3s on my machine. Before going into mainline, the numbers would be changed so it took no more than the other unit tests, so about 10ms. Many TODOs left to cleanup. Primarily Safety rationalizations. The buffer allocation is made as a single allocation, along with the ring. The buffer group id, bgid, also sometimes called the provided buffer group id, is manually selected through the builder api. There is no mechanism to pick one automatically. That could be added later but is not really necessary for this feature to be useful. This first implementation is without traits and without public interfaces that would let a user create a different kind of buf_ring or a different kind of `provided buffers` pool. There's a question to the liburing team outstanding about how to interpret an unexpected cqe result of res=0 and flags=4. --- src/buf/bufgroup/mod.rs | 144 +++++++ src/buf/bufring/mod.rs | 17 + src/buf/bufring/ring.rs | 843 ++++++++++++++++++++++++++++++++++++++++ src/buf/mod.rs | 6 +- 4 files changed, 1009 insertions(+), 1 deletion(-) create mode 100644 src/buf/bufgroup/mod.rs create mode 100644 src/buf/bufring/mod.rs create mode 100644 src/buf/bufring/ring.rs diff --git a/src/buf/bufgroup/mod.rs b/src/buf/bufgroup/mod.rs new file mode 100644 index 00000000..bcd3c119 --- /dev/null +++ b/src/buf/bufgroup/mod.rs @@ -0,0 +1,144 @@ +//! The io_uring device implements several provided-buffering mechanisms, which are all called +//! buffer groups in the liburing man pages. +//! +//! Buffer groups share a few things in common: +//! o all provide a mechanism to seed the kernel with userland buffers for use in various +//! read operations +//! o all use a u16 Buffer Group ID +//! o all use a u16 Buffer ID +//! o all are specified in the read or receive operations by setting +//! the IOSQE_BUFFER_SELECT bit in the sqe flags field and +//! then identifying the buffer group id in the sqe buf_group field +//! o all read or receive operations that used a buffer group have +//! the IORING_CQE_F_BUFFER bit set in the cqe flags field and +//! the buffer id chosen in the upper 16 bits of the cqe res field +//! +//! As of Oct 2022, the latest buffer group mechanism implemented by the io_uring device, and the +//! one that promises the best performance with least amount of overhead, is the buf_ring. The +//! buf_ring has several liburing man pages, the first to reference should probably be +//! io_uring_buf_ring_init(3). + +use crate::buf::bufring::BufRing; + +/// The buffer group ID. +/// +/// The creater of a buffer group is responsible for picking a buffer group id +/// that does not conflict with other buffer group ids also being registered with the uring +/// interface. +pub(crate) type Bgid = u16; + +// Future: Maybe create a bgid module with a trivial implementation of a type that tracks the next +// bgid to use. The crate's driver could do that perhaps, but there could be a benefit to tracking +// them across multiple thread's drivers. So there is flexibility in not building it into the +// driver. + +/// The buffer ID. Buffer ids are assigned and used by the crate and probably are not visible +/// to the crate user. +pub(crate) type Bid = u16; + +/// This tracks a buffer that has been filled in by the kernel, having gotten the memory +/// from a buffer ring, and returned to userland via a cqe entry. +pub struct BufX { + bgroup: BufRing, + bid: Bid, + len: usize, +} + +impl BufX { + // # Safety + // + // The bid must be the buffer id supplied by the kernel as having been chosen and written to. + // The length of the buffer must represent the length written to by the kernel. + pub(crate) unsafe fn new(bgroup: BufRing, bid: Bid, len: usize) -> Self { + // len will already have been checked against the buf_capacity + // so it is guaranteed that len <= bgroup.buf_capacity. + + Self { bgroup, bid, len } + } + + /// Return the number of bytes initialized. + /// + /// This value initially came from the kernel, as reported in the cqe. This value may have been + /// modified with a call to the IoBufMut::set_init method. + #[inline] + pub fn len(&self) -> usize { + self.len + } + + /// Return true if this represents an empty buffer. The length reported by the kernel was 0. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return the capacity of this buffer. + #[inline] + pub fn cap(&self) -> usize { + self.bgroup.buf_capacity(self.bid) + } + + /// Return a byte slice reference. + #[inline] + pub fn as_slice(&self) -> &[u8] { + let p = self.bgroup.stable_ptr(self.bid); + // Safety: the pointer returned by stable_ptr is valid for the lifetime of self, + // and self's len is set when the kernel reports the amount of data that was + // written into the buffer. + unsafe { std::slice::from_raw_parts(p, self.len) } + } + + /// Return a mutable byte slice reference. + #[inline] + pub fn as_slice_mut(&mut self) -> &mut [u8] { + let p = self.bgroup.stable_mut_ptr(self.bid); + // Safety: the pointer returned by stable_mut_ptr is valid for the lifetime of self, + // and self's len is set when the kernel reports the amount of data that was + // written into the buffer. In addition, we hold a &mut reference to self. + unsafe { std::slice::from_raw_parts_mut(p, self.len) } + } + + // Future: provide access to the uninit space between len and cap if the buffer is being + // repurposed before being dropped. The set_init below does that too. +} + +impl Drop for BufX { + fn drop(&mut self) { + // Add the buffer back to the bgroup, for the kernel to reuse. + // Safety: this function may only be called by the buffer's drop function. + unsafe { self.bgroup.dropping_bid(self.bid) }; + } +} + +unsafe impl crate::buf::IoBuf for BufX { + fn stable_ptr(&self) -> *const u8 { + self.bgroup.stable_ptr(self.bid) + } + + fn bytes_init(&self) -> usize { + self.len + } + + fn bytes_total(&self) -> usize { + self.cap() + } +} + +unsafe impl crate::buf::IoBufMut for BufX { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.bgroup.stable_mut_ptr(self.bid) + } + + unsafe fn set_init(&mut self, init_len: usize) { + if self.len < init_len { + let cap = self.bgroup.buf_capacity(self.bid); + assert!(init_len <= cap); + self.len = init_len; + } + } +} + +impl From for Vec { + fn from(item: BufX) -> Self { + item.as_slice().to_vec() + } +} diff --git a/src/buf/bufring/mod.rs b/src/buf/bufring/mod.rs new file mode 100644 index 00000000..e10dd71a --- /dev/null +++ b/src/buf/bufring/mod.rs @@ -0,0 +1,17 @@ +//! A buf_ring pool of buffers registered with the kernel. +//! +//! This module provides the [`BufRing`] and [`Builder`] to allow +//! using the `buf_ring` feature of the kernel's `io_uring` device. +//! +//! The [`BufRing`] is this library's only implementation of the device's more general `Provided +//! Buffers` feature where some device operations can work with buffers that had been provided to +//! the device at an earlier point, rather than as part of the operation itself. +//! +//! Operations like [`crate::net::TcpStream::recv_provbuf`] make use of the `buf_ring`. This +//! operation does not take a buffer as input, but does return a buffer when successful. Once the +//! buffer is dropped, it is returned to the `buf_ring`. + +pub(crate) mod ring; + +pub use ring::BufRing; +pub use ring::Builder; diff --git a/src/buf/bufring/ring.rs b/src/buf/bufring/ring.rs new file mode 100644 index 00000000..03790a6f --- /dev/null +++ b/src/buf/bufring/ring.rs @@ -0,0 +1,843 @@ +//! Module for the io_uring device's buf_ring feature. + +// Developer's note about io_uring return codes when a buf_ring is used: +// +// While a buf_ring pool is exhaused, new calls to read that are, or are not, ready to read will +// fail with the 105 error, "no buffers", while existing calls that were waiting to become ready to +// read will not fail. Only when the data becomes ready to read will they fail, if the buffer ring +// is still empty at that time. This makes sense when thinking about it from how the kernel +// implements the start of a read command; it can be confusing when first working with these +// commands from the userland perspective. + +// While the file! calls yield the clippy false positive. +#![allow(clippy::print_literal)] + +use io_uring::types; +use std::cell::Cell; +use std::io; +use std::rc::Rc; +use std::sync::atomic::{self, AtomicU16}; + +use super::super::bufgroup::{Bgid, Bid, BufX}; +use crate::runtime::CONTEXT; + +/// A `BufRing` represents the ring and the buffers used with the kernel's io_uring buf_ring +/// feature. +/// +/// In this implementation, it is both the ring of buffer entries and the actual buffer +/// allocations. +/// +/// A BufRing is created through the [`Builder`] and can be registered automatically by the +/// builder's `build` step or at a later time by the user. Registration involves informing the +/// kernel of the ring's dimensions and its identifier (its buffer group id, which goes by the name +/// `bgid`). +/// +/// Multiple buf_rings, here multiple BufRings, can be created and registered. BufRings are +/// reference counted to ensure their memory is live while their BufX buffers are live. When a BufX +/// buffer is dropped, it releases itself back to the BufRing from which it came allowing it to be +/// reused by the kernel. +/// +/// It is perhaps worth pointing out that it is the ring itself that is registered with the kernel, +/// not the buffers per se. While a given buf_ring cannot have it size changed dynamically, the +/// buffers that are pushed to the ring by userland, and later potentially re-pushed in the ring, +/// can change. The buffers can be of different sizes and they could come from different allocation +/// blocks. This implementation does not provide that flexibility. Each BufRing comes with its own +/// equal length buffer allocation. And when a BufRing buffer, a BufX, is dropped, its id is pushed +/// back to the ring. +/// +/// This is the one and only `Provided Buffers` implementation in `tokio_uring` at the moment and +/// in this version, is a purely concrete type, with a concrete BufX type for buffers that are +/// returned by operations like `recv_provbuf` to the userland application. +/// +/// Aside from the register and unregister steps, there are no syscalls used to pass buffers to the +/// kernel. The ring contains a tail memory address that this userland type updates as buffers are +/// added to the ring and which the kernel reads when it needs to pull a buffer from the ring. The +/// kernel does not have a head pointer address that it updates for the userland. The userland +/// (this type), is expected to avoid overwriting the head of the circular ring by keeping track of +/// how many buffers were added to the ring and how many have been returned through the CQE +/// mechanism. This particular implementation does not track the count because all buffers are +/// allocated at the beginning, by the builder, and only its own buffers that came back via a CQE +/// are ever added back to the ring, so it should be impossible to overflow the ring. +#[derive(Clone, Debug)] +pub struct BufRing { + // RawBufRing uses cell for fields where necessary. + raw: Rc, +} + +// Methods for the user. + +impl BufRing { + /// Registers the buf_ring manually. + /// + /// This is not normally called because the builder defaults to registering the ring when it + /// builds the ring. This is provided for the case where a BufRing is being built before the + /// tokio_uring runtime has started. + pub fn register(&self) -> io::Result<()> { + self.raw.register() + } + + /// Unregisters the buf_ring manually. + /// + /// This not normally called because the drop mechanism will unregister the ring if it had not + /// already been unregistered. + /// + /// This function makes it possible to unregister a ring while some of its BufX buffers may + /// still be live. This does not result in UB. It just means the io_uring device will not have + /// this buf_ring pool to draw from. Put another way, the step of unregistering the ring does + /// not deallocate the buffers. + pub fn unregister(&self) -> io::Result<()> { + self.raw.unregister() + } + + /// (Very experimental and should probably be behind a cfg option.) + /// + /// Returns the lowest level buffer pool size that has been observed. It cannot be accurate + /// because it cannot take into account in-flight operations that may draw from the pool. + /// + /// This might be useful when running the system and trying to decide if the pool was sized + /// correctly. Maintaining this value does come with a small overhead. + #[allow(dead_code)] + pub fn possible_min(&self) -> u16 { + self.raw.possible_min() + } + + /// (Very experimental and should probably be behind a cfg option.) + /// + /// Like `possible_min` but also resets the metric to the total number of buffers that had been + /// allocated. + #[allow(dead_code)] + pub fn possible_min_and_reset(&self) -> u16 { + self.raw.possible_min_and_reset() + } +} + +// Methods the BufX needs. + +impl BufRing { + pub(crate) fn buf_capacity(&self, _: Bid) -> usize { + self.raw.buf_capacity_i() + } + + pub(crate) fn stable_ptr(&self, bid: Bid) -> *const u8 { + // Will panic if bid is out of range. + self.raw.stable_ptr_i(bid) + } + + pub(crate) fn stable_mut_ptr(&mut self, bid: Bid) -> *mut u8 { + // Safety: self is &mut, we're good. + unsafe { self.raw.stable_mut_ptr_i(bid) } + } + + // # Safety + // + // `dropping_bid` should only be called by the buffer's drop function because once called, the + // buffer may be given back to the kernel for reuse. + pub(crate) unsafe fn dropping_bid(&self, bid: Bid) { + self.raw.dropping_bid_i(bid); + } +} + +// Methods the io operations need. + +impl BufRing { + pub(crate) fn bgid(&self) -> Bgid { + self.raw.bgid() + } + + // # Safety + // + // The res and flags values are used to lookup a buffer and set its initialized length. + // The caller is responsible for these being correct. This is expected to be called + // when these two values are received from the kernel via a CQE and we rely on the kernel to + // give us correct information. + pub(crate) unsafe fn get_buf(&self, res: u32, flags: u32) -> io::Result { + let bid = match io_uring::cqueue::buffer_select(flags) { + Some(bid) => bid, + None => { + // Have seen res == 0, flags == 4. 4 meaning socket is non empty? + // eprintln!("res {res}, flags {flags}"); + //unreachable!("flags should have the IORING_CQE_F_BUFFER bit"); + + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "BufRing::get_buf failed as the buffer bit, IORING_CQE_F_BUFFER, was missing from flags, res = {}, flags = {}", + res, flags) + )); + } + }; + + let len = res as usize; + + /* + let flags = flags & !io_uring::sys::IORING_CQE_F_BUFFER; // for tracing flags + println!( + "{}:{}: get_buf res({res})=len({len}) flags({:#x})->bid({bid})\n\n", + file!(), + line!(), + flags + ); + */ + + assert!(len <= self.raw.buf_len); + + // TODO maybe later + // #[cfg(any(debug, feature = "cautious"))] + // { + // let mut debug_bitmap = self.debug_bitmap.borrow_mut(); + // let m = 1 << (bid % 8); + // assert!(debug_bitmap[(bid / 8) as usize] & m == m); + // debug_bitmap[(bid / 8) as usize] &= !m; + // } + + self.raw.metric_getting_another(); + /* + println!( + "{}:{}: get_buf cur {}, min {}", + file!(), + line!(), + self.possible_cur.get(), + self.possible_min.get(), + ); + */ + + // Safety: the len provided to BufX::new is given to us from the kernel. + Ok(unsafe { BufX::new(self.clone(), bid, len) }) + } +} + +#[derive(Debug, Copy, Clone)] +/// Build the arguments to call build() that returns a [`BufRing`]. +/// +/// Refer to the methods descriptions for details. +#[allow(dead_code)] +pub struct Builder { + page_size: usize, + bgid: Bgid, + ring_entries: u16, + buf_cnt: u16, + buf_len: usize, + buf_align: usize, + ring_pad: usize, + bufend_align: usize, + + skip_register: bool, +} + +#[allow(dead_code)] +impl Builder { + /// Create a new Builder with the given buffer group ID and defaults. + /// + /// The buffer group ID, `bgid`, is the id the kernel's io_uring device uses to identify the + /// provided buffer pool to use by operations that are posted to the device. + /// + /// The user is responsible for picking a bgid that does not conflict with other buffer groups + /// that have been registered with the same uring interface. + pub fn new(bgid: Bgid) -> Builder { + Builder { + page_size: 4096, + bgid, + ring_entries: 128, + buf_cnt: 0, + buf_len: 4096, + buf_align: 0, + ring_pad: 0, + bufend_align: 0, + skip_register: false, + } + } + + /// The page size of the kernel. Defaults to 4096. + /// + /// The io_uring device requires the BufRing is allocated on the start of a page, i.e. with a + /// page size alignment. + /// + /// The caller should determine the page size, and may want to cache the info if multiple buf + /// rings are to be created. Crates are available to get this information or the user may want + /// to call the libc sysconf directly: + /// + /// use libc::{_SC_PAGESIZE, sysconf}; + /// let page_size: usize = unsafe { sysconf(_SC_PAGESIZE) as usize }; + pub fn page_size(mut self, page_size: usize) -> Builder { + self.page_size = page_size; + self + } + + /// The number of ring entries to create for the buffer ring. + /// + /// This defaults to 128 or the `buf_cnt`, whichever is larger. + /// + /// The number will be made a power of 2, and will be the maximum of the ring_entries setting + /// and the buf_cnt setting. The interface will enforce a maximum of 2^15 (32768) so it can do + /// rollover calculation. + /// + /// Each ring entry is 16 bytes. + pub fn ring_entries(mut self, ring_entries: u16) -> Builder { + self.ring_entries = ring_entries; + self + } + + /// The number of buffers to allocate. If left zero, the ring_entries value will be used and + /// that value defaults to 128. + pub fn buf_cnt(mut self, buf_cnt: u16) -> Builder { + self.buf_cnt = buf_cnt; + self + } + + /// The length of each allocated buffer. Defaults to 4096. + /// + /// Non-alignment values are possible and `buf_align` can be used to allocate each buffer on + /// an alignment buffer, even if the buffer length is not desired to equal the alignment. + pub fn buf_len(mut self, buf_len: usize) -> Builder { + self.buf_len = buf_len; + self + } + + /// The alignment of the first buffer allocated. + /// + /// Generally not needed. + /// + /// The buffers are allocated right after the ring unless `ring_pad` is used and generally the + /// buffers are allocated contiguous to one another unless the `buf_len` is set to something + /// different. + pub fn buf_align(mut self, buf_align: usize) -> Builder { + self.buf_align = buf_align; + self + } + + /// Pad to place after ring to ensure separation between rings and first buffer. + /// + /// Generally not needed but may be useful if the ring's end and the buffers' start are to have + /// some separation, perhaps for cacheline reasons. + pub fn ring_pad(mut self, ring_pad: usize) -> Builder { + self.ring_pad = ring_pad; + self + } + + /// The alignment of the end of the buffer allocated. To keep other things out of a cache line + /// or out of a page, if that's desired. + pub fn bufend_align(mut self, bufend_align: usize) -> Builder { + self.bufend_align = bufend_align; + self + } + + /// Skip automatic registration. The caller can manually invoke the buf_ring.register() + /// function later. Regardless, the unregister() method will be called automatically when the + /// BufRing goes out of scope if the caller hadn't manually called buf_ring.unregister() + /// already. + pub fn skip_auto_register(mut self, skip: bool) -> Builder { + self.skip_register = skip; + self + } + + /// Return a BufRing, having computed the layout for the single aligned allocation + /// of both the buffer ring elements and the buffers themselves. + /// + /// If auto_register was left enabled, register the BufRing with the driver. + pub fn build(&self) -> io::Result { + let mut b: Builder = *self; + + // Two cases where both buf_cnt and ring_entries are set to the max of the two. + if b.buf_cnt == 0 || b.ring_entries < b.buf_cnt { + let max = std::cmp::max(b.ring_entries, b.buf_cnt); + b.buf_cnt = max; + b.ring_entries = max; + } + + // Don't allow the next_power_of_two calculation to be done if already larger than 2^15 + // because 2^16 reads back as 0 in a u16. And the interface doesn't allow for ring_entries + // larger than 2^15 anyway, so this is a good place to catch it. Here we return a unique + // error that is more descriptive than the InvalidArg that would come from the interface. + if b.ring_entries > (1 << 15) { + return Err(io::Error::new( + io::ErrorKind::Other, + "ring_entries exceeded 32768", + )); + } + + // Requirement of the interface is the ring entries is a power of two, making its and our + // mask calculation trivial. + b.ring_entries = b.ring_entries.next_power_of_two(); + + Ok(BufRing { + raw: Rc::new(RawBufRing::new(NewArgs { + page_size: b.page_size, + bgid: b.bgid, + ring_entries: b.ring_entries, + buf_cnt: b.buf_cnt, + buf_len: b.buf_len, + buf_align: b.buf_align, + ring_pad: b.ring_pad, + bufend_align: b.bufend_align, + auto_register: !b.skip_register, + })?), + }) + } +} + +// Trivial helper struct for this module. +struct NewArgs { + page_size: usize, + bgid: Bgid, + ring_entries: u16, + buf_cnt: u16, + buf_len: usize, + buf_align: usize, + ring_pad: usize, + bufend_align: usize, + auto_register: bool, +} + +#[derive(Debug)] +struct RawBufRing { + bgid: Bgid, + + // Keep mask rather than ring size because mask is used often, ring size not. + //ring_entries: u16, // Invariants: > 0, power of 2, max 2^15 (32768). + ring_entries_mask: u16, // Invariant one less than ring_entries which is > 0, power of 2, max 2^15 (32768). + + buf_cnt: u16, // Invariants: > 0, <= ring_entries. + buf_len: usize, // Invariant: > 0. + layout: std::alloc::Layout, + ring_addr: *const types::BufRingEntry, // Invariant: constant. + buffers_addr: *mut u8, // Invariant: constant. + local_tail: Cell, + tail_addr: *const AtomicU16, + registered: Cell, + + // The first `possible` field is a best effort at tracking the current buffer pool usage and + // from that, tracking the lowest level that has been reached. The two are an attempt at + // letting the user check the sizing needs of their buf_ring pool. + // + // We don't really know how deep the uring device has gone into the pool because we never see + // its head value and it can be taking buffers from the ring, in-flight, while we add buffers + // back to the ring. All we know is when a CQE arrives and a buffer lookup is performed, a + // buffer has already been taken from the pool, and when the buffer is dropped, we add it back + // to the the ring and it is about to be considered part of the pool again. + possible_cur: Cell, + possible_min: Cell, + // + // TODO maybe later + // #[cfg(any(debug, feature = "cautious"))] + // debug_bitmap: RefCell>, +} + +impl RawBufRing { + fn new(new_args: NewArgs) -> io::Result { + #[allow(non_upper_case_globals)] + const trace: bool = false; + + let NewArgs { + page_size, + bgid, + ring_entries, + buf_cnt, + buf_len, + buf_align, + ring_pad, + bufend_align, + auto_register, + } = new_args; + + // Check that none of the important args are zero and the ring_entries is at least large + // enough to hold all the buffers and that ring_entries is a power of 2. + + if (buf_cnt == 0) + || (buf_cnt > ring_entries) + || (buf_len == 0) + || ((ring_entries & (ring_entries - 1)) != 0) + { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + + // entry_size is 16 bytes. + let entry_size = std::mem::size_of::(); + let mut ring_size = entry_size * (ring_entries as usize); + if trace { + println!( + "{}:{}: entry_size {} * ring_entries {} = ring_size {} {:#x}", + file!(), + line!(), + entry_size, + ring_entries, + ring_size, + ring_size, + ); + } + + ring_size += ring_pad; + + if trace { + println!( + "{}:{}: after +ring_pad {} ring_size {} {:#x}", + file!(), + line!(), + ring_pad, + ring_size, + ring_size, + ); + } + + if buf_align > 0 { + let buf_align = buf_align.next_power_of_two(); + ring_size = (ring_size + (buf_align - 1)) & !(buf_align - 1); + if trace { + println!( + "{}:{}: after buf_align ring_size {} {:#x}", + file!(), + line!(), + ring_size, + ring_size, + ); + } + } + let buf_size = buf_len * (buf_cnt as usize); + assert!(ring_size != 0); + assert!(buf_size != 0); + let mut tot_size: usize = ring_size + buf_size; + if trace { + println!( + "{}:{}: ring_size {} {:#x} + buf_size {} {:#x} = tot_size {} {:#x}", + file!(), + line!(), + ring_size, + ring_size, + buf_size, + buf_size, + tot_size, + tot_size + ); + } + if bufend_align > 0 { + // for example, if bufend_align is 4096, would make total size a multiple of pages + let bufend_align = bufend_align.next_power_of_two(); + tot_size = (tot_size + (bufend_align - 1)) & !(bufend_align - 1); + if trace { + println!( + "{}:{}: after bufend_align tot_size {} {:#x}", + file!(), + line!(), + tot_size, + tot_size, + ); + } + } + + let align: usize = page_size; // alignment must be at least the page size + let align = align.next_power_of_two(); + let layout = std::alloc::Layout::from_size_align(tot_size, align).unwrap(); + + assert!(layout.size() >= ring_size); + // Safety: we are assured layout has nonzero size, we passed the assert just above. + let ring_addr: *mut u8 = unsafe { std::alloc::alloc_zeroed(layout) }; + + // Buffers starts after the ring_size. + // Safety: are we assured the address and the offset are in bounds because the ring_addr is + // the value we got from the alloc call, and the layout.size was shown to be at least as + // large as the ring_size. + let buffers_addr: *mut u8 = unsafe { ring_addr.add(ring_size) }; + if trace { + println!( + "{}:{}: ring_addr {} {:#x}, layout: size {} align {}", + file!(), + line!(), + ring_addr as u64, + ring_addr as u64, + layout.size(), + layout.align() + ); + println!( + "{}:{}: buffers_addr {} {:#x}", + file!(), + line!(), + buffers_addr as u64, + buffers_addr as u64, + ); + } + + let ring_addr: *const types::BufRingEntry = ring_addr as _; + + // Safety: the ring_addr passed into tail is the start of the ring. It is both the start of + // the ring and the first entry in the ring. + let tail_addr = unsafe { types::BufRingEntry::tail(ring_addr) } as *const AtomicU16; + + let ring_entries_mask = ring_entries - 1; + assert!((ring_entries & ring_entries_mask) == 0); + + let buf_ring = RawBufRing { + bgid, + ring_entries_mask, + buf_cnt, + buf_len, + layout, + ring_addr, + buffers_addr, + local_tail: Cell::new(0), + tail_addr, + registered: Cell::new(false), + possible_cur: Cell::new(0), + possible_min: Cell::new(buf_cnt), + // + // TODO maybe later + // #[cfg(any(debug, feature = "cautious"))] + // debug_bitmap: RefCell::new(std::vec![0; ((buf_cnt+7)/8) as usize]), + }; + + // Question had come up: where should the initial buffers be added to the ring? + // Here when the ring is created, even before it is registered potentially? + // Or after registration? + // + // For this type, BufRing, we are adding the buffers to the ring as the last part of creating the BufRing, + // even before registration is optionally performed. + // + // We've seen the registration to be successful, even when the ring starts off empty. + + // Add the buffers here where the ring is created. + + for bid in 0..buf_cnt { + buf_ring.buf_ring_add(bid); + } + buf_ring.buf_ring_sync(); + + // The default is to register the buffer ring right here. There is usually no reason the + // caller should want to register it some time later. + // + // Perhaps the caller wants to allocate the buffer ring before the CONTEXT driver is in + // place - that would be a reason to delay the register call until later. + + if auto_register { + buf_ring.register()?; + } + Ok(buf_ring) + } + + /// Register the buffer ring with the kernel. + /// Normally this is done automatically when building a BufRing. + /// + /// This method must be called in the context of a `tokio-uring` runtime. + /// The registration persists for the lifetime of the runtime, unless + /// revoked by the [`unregister`] method. Dropping the + /// instance this method has been called on does revoke + /// the registration and deallocate the buffer space. + /// + /// [`unregister`]: Self::unregister + /// + /// # Errors + /// + /// If a `Provided Buffers` group with the same `bgid` is already registered, the function + /// returns an error. + fn register(&self) -> io::Result<()> { + let bgid = self.bgid; + //println!("{}:{}: register bgid {bgid}", file!(), line!()); + + // Future: move to separate public function so other buf_ring implementations + // can register, and unregister, the same way. + + let res = CONTEXT.with(|x| { + x.handle() + .as_ref() + .expect("Not in a runtime context") + .register_buf_ring(self.ring_addr as _, self.ring_entries(), bgid) + }); + // println!("{}:{}: res {:?}", file!(), line!(), res); + + if let Err(e) = res { + match e.raw_os_error() { + Some(22) => { + // using buf_ring requires kernel 5.19 or greater. + // TODO turn these eprintln into new, more expressive error being returned. + // TODO what convention should we follow in this crate for adding information + // onto an error? + eprintln!( + "buf_ring.register returned {e}, most likely indicating this kernel is not 5.19+", + ); + } + Some(17) => { + // Registering a duplicate bgid is not allowed. There is an `unregister` + // operations that can remove the first. + eprintln!( + "buf_ring.register returned `{e}`, indicating the attempted buffer group id {bgid} was already registered", + ); + } + _ => { + eprintln!("buf_ring.register returned `{e}` for group id {bgid}"); + } + } + return Err(e); + }; + + self.registered.set(true); + + res + } + + /// Unregister the buffer ring from the io_uring. + /// Normally this is done automatically when the BufRing goes out of scope. + /// + /// Warning: requires the CONTEXT driver is already in place or will panic. + fn unregister(&self) -> io::Result<()> { + // If not registered, make this a no-op. + if !self.registered.get() { + return Ok(()); + } + + self.registered.set(false); + + let bgid = self.bgid; + + // If there is no context, bail out with an Ok(()) because the registration and + // the entire io_uring is already done anyway. + CONTEXT.with(|x| { + x.handle() + .as_ref() + .map_or(Ok(()), |handle| handle.unregister_buf_ring(bgid)) + }) + } + + /// Returns the buffer group id. + #[inline] + fn bgid(&self) -> Bgid { + self.bgid + } + + fn metric_getting_another(&self) { + self.possible_cur.set(self.possible_cur.get() - 1); + self.possible_min.set(std::cmp::min( + self.possible_min.get(), + self.possible_cur.get(), + )); + } + + // # Safety + // + // Dropping a duplicate bid is likely to cause undefined behavior + // as the kernel uses the same buffer for different data concurrently. + unsafe fn dropping_bid_i(&self, bid: Bid) { + self.buf_ring_add(bid); + self.buf_ring_sync(); + } + + #[inline] + fn buf_capacity_i(&self) -> usize { + self.buf_len as _ + } + + #[inline] + // # Panic + // + // This function will panic if given a bid that is not within the valid range 0..self.buf_cnt. + fn stable_ptr_i(&self, bid: Bid) -> *const u8 { + assert!(bid < self.buf_cnt); + let offset: usize = self.buf_len * (bid as usize); + // Safety: buffers_addr is an u8 pointer and was part of an allocation large enough to hold + // buf_cnt number of buf_len buffers. buffers_addr, buf_cnt and buf_len are treated as + // constants and bid was just asserted to be less than buf_cnt. + unsafe { self.buffers_addr.add(offset) } + } + + // # Safety + // + // This may only be called by an owned or &mut object. + // + // # Panic + // This will panic if bid is out of range. + #[inline] + unsafe fn stable_mut_ptr_i(&self, bid: Bid) -> *mut u8 { + assert!(bid < self.buf_cnt); + let offset: usize = self.buf_len * (bid as usize); + // Safety: buffers_addr is an u8 pointer and was part of an allocation large enough to hold + // buf_cnt number of buf_len buffers. buffers_addr, buf_cnt and buf_len are treated as + // constants and bid was just asserted to be less than buf_cnt. + self.buffers_addr.add(offset) + } + + #[inline] + fn ring_entries(&self) -> u16 { + self.ring_entries_mask + 1 + } + + #[inline] + fn mask(&self) -> u16 { + self.ring_entries_mask + } + + // Writes to a ring entry and updates our local copy of the tail. + // + // Adds the buffer known by its buffer id to the buffer ring. The buffer's address and length + // are known given its bid. + // + // This does not sync the new tail value. The caller should use `buf_ring_sync` for that. + // + // Panics if the bid is out of range. + fn buf_ring_add(&self, bid: Bid) { + // Compute address of current tail position, increment the local copy of the tail. Then + // write the buffer's address, length and bid into the current tail entry. + + let cur_tail = self.local_tail.get(); + self.local_tail.set(cur_tail.wrapping_add(1)); + let ring_idx = cur_tail & self.mask(); + + let ring_addr = self.ring_addr as *mut types::BufRingEntry; + + // Safety: + // 1. the pointer address (ring_addr), is set and const at self creation time, + // and points to a block of memory at least as large as the number of ring_entries, + // 2. the mask used to create ring_idx is one less than + // the number of ring_entries, and ring_entries was tested to be a power of two, + // So the address gotten by adding ring_idx entries to ring_addr is guaranteed to + // be a valid address of a ring entry. + let entry = unsafe { &mut *ring_addr.add(ring_idx as usize) }; + + entry.set_addr(self.stable_ptr_i(bid) as _); + entry.set_len(self.buf_len as _); + entry.set_bid(bid); + + // Update accounting. + self.possible_cur.set(self.possible_cur.get() + 1); + + // TODO maybe later + // #[cfg(any(debug, feature = "cautious"))] + // { + // let mut debug_bitmap = self.debug_bitmap.borrow_mut(); + // let m = 1 << (bid % 8); + // assert!(debug_bitmap[(bid / 8) as usize] & m == 0); + // debug_bitmap[(bid / 8) as usize] |= m; + // } + } + + // Make 'count' new buffers visible to the kernel. Called after + // io_uring_buf_ring_add() has been called 'count' times to fill in new + // buffers. + #[inline] + fn buf_ring_sync(&self) { + // Safety: dereferencing this raw pointer is safe. The tail_addr was computed once at init + // to refer to the tail address in the ring and is held const for self's lifetime. + unsafe { + (*self.tail_addr).store(self.local_tail.get(), atomic::Ordering::Release); + } + // The liburing code did io_uring_smp_store_release(&br.tail, local_tail); + } + + // Return the possible_min buffer pool size. + fn possible_min(&self) -> u16 { + self.possible_min.get() + } + + // Return the possible_min buffer pool size and reset to allow fresh counting going forward. + fn possible_min_and_reset(&self) -> u16 { + let res = self.possible_min.get(); + self.possible_min.set(self.buf_cnt); + res + } +} + +impl Drop for RawBufRing { + fn drop(&mut self) { + if self.registered.get() { + _ = self.unregister(); + } + // Safety: the ptr and layout are treated as constant, and ptr (ring_addr) was assigned by + // a call to std::alloc::alloc_zeroed using the same layout. + unsafe { std::alloc::dealloc(self.ring_addr as *mut u8, self.layout) }; + } +} diff --git a/src/buf/mod.rs b/src/buf/mod.rs index 71ab196c..4cdcc30c 100644 --- a/src/buf/mod.rs +++ b/src/buf/mod.rs @@ -4,6 +4,10 @@ //! crate defines [`IoBuf`] and [`IoBufMut`] traits which are implemented by buffer //! types that respect the `io-uring` contract. +pub(crate) mod bufgroup; + +pub mod bufring; + pub mod fixed; mod io_buf; @@ -26,6 +30,6 @@ pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] { pub(crate) fn deref_mut(buf: &mut impl IoBufMut) -> &mut [u8] { // Safety: the `IoBufMut` trait is marked as unsafe and is expected to be - // implemented correct. + // implemented correctly. unsafe { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_init()) } } From dbcbd8b60b4e2d828bcabf020e2efc8f536ef734 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Mon, 20 Feb 2023 19:33:10 -0500 Subject: [PATCH 02/16] driver functions for buf_ring register and unregister --- src/runtime/driver/handle.rs | 15 +++++++++++++++ src/runtime/driver/mod.rs | 24 ++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/runtime/driver/handle.rs b/src/runtime/driver/handle.rs index ab3dcc51..3f0a3d28 100644 --- a/src/runtime/driver/handle.rs +++ b/src/runtime/driver/handle.rs @@ -63,6 +63,21 @@ impl Handle { self.inner.borrow_mut().unregister_buffers(buffers) } + pub(crate) fn register_buf_ring( + &self, + ring_addr: u64, + ring_entries: u16, + bgid: u16, + ) -> io::Result<()> { + self.inner + .borrow_mut() + .register_buf_ring(ring_addr, ring_entries, bgid) + } + + pub(crate) fn unregister_buf_ring(&self, bgid: u16) -> io::Result<()> { + self.inner.borrow_mut().unregister_buf_ring(bgid) + } + pub(crate) fn submit_op(&self, data: T, f: F) -> io::Result> where T: Completable, diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index ab80624b..3d79d568 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -122,6 +122,30 @@ impl Driver { )) } + pub(crate) fn register_buf_ring( + &mut self, + ring_addr: u64, + ring_entries: u16, + bgid: u16, + ) -> io::Result<()> { + self.uring + .submitter() + .register_buf_ring(ring_addr, ring_entries, bgid)?; + + // TODO should driver keep anything about the buf_ring? + // Perhaps something that maps the bgid to a creator, given a bid coming from the cqe? + // Or will the future that sent the command be able to convert the bid to a buffer pointer? + // And what if the future is dropped? + //self.fixed_buffers = Some(buffers); + Ok(()) + } + + pub(crate) fn unregister_buf_ring(&mut self, bgid: u16) -> io::Result<()> { + self.uring.submitter().unregister_buf_ring(bgid)?; + + Ok(()) + } + pub(crate) fn submit_op( &mut self, mut data: T, From 9ccadd8de5e704c74b3b86caf942e7578f4a79e3 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Mon, 20 Feb 2023 19:34:10 -0500 Subject: [PATCH 03/16] recv operations, normal buffer and for buf_ring --- src/io/recv.rs | 66 ++++++++++++++++++++++++++++++++++++++++++ src/io/recv_provbuf.rs | 59 +++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 src/io/recv.rs create mode 100644 src/io/recv_provbuf.rs diff --git a/src/io/recv.rs b/src/io/recv.rs new file mode 100644 index 00000000..d67bf52d --- /dev/null +++ b/src/io/recv.rs @@ -0,0 +1,66 @@ +use crate::buf::BoundedBufMut; +use crate::io::SharedFd; +use crate::BufResult; + +use crate::runtime::driver::op::{Completable, CqeResult, Op}; +use crate::runtime::CONTEXT; +use std::io; + +pub(crate) struct Recv { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + #[allow(dead_code)] + fd: SharedFd, + + /// Reference to the in-flight buffer. + pub(crate) buf: T, +} + +impl Op> { + // TODO remove allow + #[allow(dead_code)] + pub(crate) fn recv(fd: &SharedFd, buf: T, flags: Option) -> io::Result>> { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + Recv { + fd: fd.clone(), + buf, + }, + |s| { + // Get raw buffer info + let ptr = s.buf.stable_mut_ptr(); + let len = s.buf.bytes_total(); + opcode::Recv::new(types::Fd(fd.raw_fd()), ptr, len as _) + .flags(flags.unwrap_or(0)) + .build() + }, + ) + }) + } +} + +impl Completable for Recv +where + T: BoundedBufMut, +{ + type Output = BufResult; + + fn complete(self, cqe: CqeResult) -> Self::Output { + // Convert the operation result to `usize` + let res = cqe.result.map(|v| v as usize); + // Recover the buffer + let mut buf = self.buf; + + // If the operation was successful, advance the initialized cursor. + if let Ok(n) = res { + // Safety: the kernel wrote `n` bytes to the buffer. + unsafe { + buf.set_init(n); + } + } + + (res, buf) + } +} diff --git a/src/io/recv_provbuf.rs b/src/io/recv_provbuf.rs new file mode 100644 index 00000000..37f7e6c2 --- /dev/null +++ b/src/io/recv_provbuf.rs @@ -0,0 +1,59 @@ +use crate::buf::bufgroup::BufX; +use crate::io::SharedFd; + +use crate::buf::bufring::BufRing; +use crate::runtime::driver::op::{Completable, CqeResult, Op}; +use crate::runtime::CONTEXT; +use io_uring::squeue; +use std::io; + +pub(crate) struct RecvProvBuf { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + #[allow(dead_code)] + fd: SharedFd, + + /// The bufgroup that supplies the bgid and the get_buf function. + group: BufRing, +} + +impl Op { + pub(crate) fn recv_provbuf( + fd: &SharedFd, + group: BufRing, + flags: Option, + ) -> io::Result> { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + RecvProvBuf { + fd: fd.clone(), + group, + }, + |s| { + // Get raw buffer info + opcode::Recv::new(types::Fd(fd.raw_fd()), std::ptr::null_mut(), 0 as _) + .flags(flags.unwrap_or(0)) + .buf_group(s.group.bgid()) + .build() + .flags(squeue::Flags::BUFFER_SELECT) + }, + ) + }) + } +} + +impl Completable for RecvProvBuf { + type Output = Result; + + fn complete(self, cqe: CqeResult) -> Self::Output { + let res = cqe.result?; + let flags = cqe.flags; + + // Safety: getting a buffer from the group requires the res and flags values accurately + // indentify a buffer and the length which was written to by the kernel. The res and flags + // passed here are those provided by the kernel. + unsafe { self.group.get_buf(res, flags) } + } +} From 300abc8e7421e399cccd2e2080e4b42889f73ad4 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Mon, 20 Feb 2023 19:35:11 -0500 Subject: [PATCH 04/16] socket gets recv and recv_provbuf methods --- src/io/mod.rs | 4 ++++ src/io/socket.rs | 13 +++++++++++++ 2 files changed, 17 insertions(+) diff --git a/src/io/mod.rs b/src/io/mod.rs index ae1242be..9c556440 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,8 +21,12 @@ mod read_fixed; mod readv; +mod recv; + mod recv_from; +mod recv_provbuf; + mod rename_at; mod send_to; diff --git a/src/io/socket.rs b/src/io/socket.rs index ff183ac2..72168e9c 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -162,6 +162,19 @@ impl Socket { op.await } + pub(crate) async fn recv(&self, buf: T) -> crate::BufResult { + let op = Op::recv(&self.fd, buf, None).unwrap(); + op.await + } + + pub(crate) async fn recv_provbuf( + &self, + group: crate::buf::bufring::BufRing, + ) -> Result { + let op = Op::recv_provbuf(&self.fd, group, None).unwrap(); + op.await + } + pub(crate) async fn read_fixed(&self, buf: T) -> crate::BufResult where T: BoundedBufMut, From a5650296aa9f123bfe003d9539e153ea12e5dcfa Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Mon, 20 Feb 2023 19:36:34 -0500 Subject: [PATCH 05/16] tcp stream gets recv and recv_provbuf methods --- src/net/tcp/stream.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index bc81bc8e..c97990d2 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -77,6 +77,24 @@ impl TcpStream { self.inner.read(buf).await } + /// Recv some data from the stream into the buffer. + /// + /// Returns the original buffer and quantity of data received. + pub async fn recv(&self, buf: T) -> crate::BufResult { + self.inner.recv(buf).await + } + + /// (Experimental: type BufRing and BufX likely to change.) + /// Recv some data from the stream into a buffer picked from the provided buffers. + /// + /// Returns the chosen buffer. + pub async fn recv_provbuf( + &self, + group: crate::buf::bufring::BufRing, + ) -> Result { + self.inner.recv_provbuf(group).await + } + /// Read some data from the stream into a registered buffer. /// /// Like [`read`], but using a pre-mapped buffer From 0969da3eb9a3eb1840990fdbf5e4d6c15d3361b9 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Mon, 20 Feb 2023 19:37:15 -0500 Subject: [PATCH 06/16] add unit tests at tests/net.rs --- tests/common/mod.rs | 299 ++++++++++++++++++++++++++++++++++++++++++++ tests/net.rs | 206 ++++++++++++++++++++++++++++++ 2 files changed, 505 insertions(+) create mode 100644 tests/common/mod.rs create mode 100644 tests/net.rs diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 00000000..5d657e54 --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,299 @@ +use std::net::SocketAddr; + +use tokio::sync::oneshot; +use tokio::task::JoinSet; +use tokio_uring::buf::bufring; +use tokio_uring::net::{TcpListener, TcpStream}; + +#[derive(Clone)] +pub enum Rx { + Read, + Recv, + RecvBufRing(bufring::BufRing), +} + +pub async fn tcp_listener() -> Result<(TcpListener, SocketAddr), std::io::Error> { + let socket_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + + let listener = TcpListener::bind(socket_addr).unwrap(); + + let socket_addr = listener.local_addr().unwrap(); + + Ok((listener, socket_addr)) +} + +async fn client_ping_pong(rx: Rx, stream: &TcpStream, send_cnt: usize, send_length: usize) { + // Implement client ping-pong loop. Make several read variations available. + + for _ in 0..send_cnt { + // Make this vector longer to cause splits in round trip transmission. + let buf = vec![1u8; send_length]; + + let (result, buf) = stream.write_all(buf).await; + let _result = result.unwrap(); + // println!("client: written: {}", _result); + + let expect = buf.len(); + let mut got: usize = 0; + + // println!("client: buf.len {}", buf.len()); + let mut buf = buf; + while got < expect { + let result; + + result = match &rx { + Rx::Read => { + let result; + (result, buf) = stream.read(buf).await; + result + } + Rx::Recv => { + let result; + (result, buf) = stream.recv(buf).await; + result + } + Rx::RecvBufRing(group) => { + loop { + let buf = stream.recv_provbuf(group.clone()).await; + match buf { + Ok(buf) => { + // If returning a Vec were necessary: + // Either form of conversion from Bufx data to Vec could be appropriate here. + // One consumes the BufX, the other doesn't and let's it drop here. + // break (Ok(buf.len()), buf.into()) + // break (Ok(buf.len()), buf.as_slice().to_vec()); + break Ok(buf.len()); + } + Err(e) => { + // Expected error: No buffer space available (os error 105) + // but sometimes getting error indicating the returned res was 0 + // and flags was 4. + if e.kind() == std::io::ErrorKind::Other { + eprintln!( + "client: assuming connection is closed: ecv_provbuf error {}", + e + ); + break Err(e); + } + eprintln!("client: recv_provbuf error {}", e); + } + } + } + } + }; + let read = result.unwrap(); + got += read; + // level1-println!("client: read {}", read); + // println!("client: read: {:?}", &_buf[..read]); + } + } +} + +async fn server_ping_pong_reusing_vec( + rx: Rx, + stream: TcpStream, + buf: Vec, + _local_addr: SocketAddr, +) { + use tokio_uring::buf::BoundedBuf; // for slice() + + let mut buf = buf; + // level1-println!("server: {} connected", peer); + let mut _n = 0; + + loop { + let (result, nbuf) = match &rx { + Rx::Read => stream.read(buf).await, + Rx::Recv => stream.recv(buf).await, + Rx::RecvBufRing(_) => unreachable!(), + }; + buf = nbuf; + let read = result.unwrap(); + if read == 0 { + // level1-println!("server: {} closed, {} total ping-ponged", peer, _n); + break; + } + + let (res, slice) = stream.write_all(buf.slice(..read)).await; + let _ = res.unwrap(); + buf = slice.into_inner(); + // level1-println!("server: {} all {} bytes ping-ponged", peer, read); + _n += read; + } +} + +async fn server_ping_pong_using_buf_ring( + stream: TcpStream, + group: &bufring::BufRing, + _local_addr: SocketAddr, +) { + // Serve the connection by looping on input, each received bufx from the kernel which + // we let go out of scope when we are done so it can be given back to the kernel. + // + // Here is a completion model based loop, as described in + // https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023 + // where the buffer being written to by the kernel is picked by the kernel from a + // provided buffer pool, and when finished working with the buffer, it is returned to + // the kernel's provided buffer pool. + + let mut _n = 0; + loop { + // Loop to allow trying again if there was no buffer space available. + let bufx = loop { + let buf = stream.recv_provbuf(group.clone()).await; + match buf { + Ok(buf) => break buf, + Err(e) => { + // Expected error: No buffer space available (os error 105), + // for which we loop around. + // + // But sometimes getting error indicating the returned res was 0 + // and flags was 4. Treat this like the connection is closed while + // awaiting confirmation from the io_uring team. + if e.kind() == std::io::ErrorKind::Other { + eprintln!( + "server: assuming connection is closed: ecv_provbuf error {}", + e + ); + return; + } + eprintln!("server: recv_provbuf error {}", e); + } + } + }; + + // Copy of logic above, but different buffer type. + + let read = bufx.len(); + if read == 0 { + // Unlikely, as the zero case seems handled by the error above. + break; + } + + // Writing bufx or bufx.slice(..read) works equally well as the bufx length *is* + // the length that was read. + // let (res, _) = stream.write_all(bufx.slice(..read)).await; + // let (res, _) = stream.write_all(bufx).await; + // + // The returned _ represents the BufX or slice of the BufX which we let go out of scope. + + let (res, _) = stream.write_all(bufx).await; + + let _ = res.unwrap(); + // level1-println!("server: {} all {} bytes ping-ponged with bufx", peer, read); + _n += read; + } +} + +pub async fn async_block_ping_pong_listener_loop(server: Server, listener: TcpListener) { + let Server { rx } = server; + loop { + let (stream, socket_addr) = listener.accept().await.unwrap(); + let rx = rx.clone(); + + // Spawn new task for each connnection + tokio_uring::spawn(async move { + match &rx { + Rx::Read | Rx::Recv => { + let buf = vec![0u8; 16 * 1024]; + server_ping_pong_reusing_vec(rx, stream, buf, socket_addr).await; + } + Rx::RecvBufRing(group) => { + server_ping_pong_using_buf_ring(stream, group, socket_addr).await; + } + }; + }); + } +} + +fn spawn_ping_pong_listener_loop(server: Server, listener: TcpListener) { + tokio_uring::spawn(async move { + async_block_ping_pong_listener_loop(server, listener).await; + }); +} + +pub fn ping_pong_clients(clients: Clients, listener_addr: SocketAddr) -> oneshot::Receiver<()> { + // Spawn clients as tokio_uring tasks and return a tokio oneshot receiver + // that will indicate when they are done. + + let Clients { + rx, + client_cnt, + send_cnt, + send_length, + } = clients; + + let mut set = JoinSet::new(); + + // Spawn several clients + + for client_id in 0..client_cnt { + let rx = rx.clone(); + set.spawn_local(async move { + let stream = TcpStream::connect(listener_addr).await.unwrap(); + client_ping_pong(rx, &stream, send_cnt, send_length).await; + + client_id // return through handle + }); + } + + let (tx, rx) = oneshot::channel::<()>(); + + tokio_uring::spawn(async move { + let mut seen = vec![false; client_cnt]; + while let Some(res) = set.join_next().await { + let client_id = res.unwrap(); + seen[client_id] = true; + } + + for i in 0..client_cnt { + assert!(seen[i]); + } + let _ = tx.send(()).unwrap(); + }); + + rx +} + +async fn _ping_pong(clients: Clients, server: Server) { + // Run `client_cnt` clients. Both clients and server use the TcpStream method identified by `rx`. + + let (listener, listener_addr) = tcp_listener().await.unwrap(); + + // Spawn perhaps multiple clients + + let clients_done = ping_pong_clients(clients, listener_addr); + + // Spawn one server + + spawn_ping_pong_listener_loop(server, listener); + + // Wait until the clients signal they are done + + // println!("common/mode.rs:{} now wait for clients to be done", line!()); + let _ = clients_done.await.unwrap(); + // println!("common/mode.rs:{} clients report being done", line!()); +} + +pub struct Clients { + pub rx: Rx, + pub client_cnt: usize, + pub send_cnt: usize, + pub send_length: usize, +} + +pub struct Server { + pub rx: Rx, +} + +pub struct PingPong { + pub clients: Clients, + pub server: Server, +} + +impl PingPong { + pub async fn run(self) { + let PingPong { clients, server } = self; + _ping_pong(clients, server).await; + } +} diff --git a/tests/net.rs b/tests/net.rs new file mode 100644 index 00000000..8b108614 --- /dev/null +++ b/tests/net.rs @@ -0,0 +1,206 @@ +use std::sync::mpsc::sync_channel; +use std::thread; + +use tokio_uring::buf::bufring; + +mod common; + +use common::Rx; + +#[test] +fn net_tcp_ping_pong_read_one() { + // Run one client. Both client and server use the TcpStream `read` method. + + tokio_uring::start(async { + common::PingPong { + clients: common::Clients { + rx: Rx::Read, + client_cnt: 1, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { rx: Rx::Read }, + } + .run() + .await; + }); +} + +#[test] +fn net_tcp_ping_pong_read_several() { + // Run 3 clients. Both clients and server use the TcpStream `read` method. + + tokio_uring::start(async { + common::PingPong { + clients: common::Clients { + rx: Rx::Read, + client_cnt: 3, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { rx: Rx::Read }, + } + .run() + .await; + }); +} + +#[test] +fn net_tcp_ping_pong_recv() { + // Run 3 clients. Both clients and server use the TcpStream `recv` method. + + tokio_uring::start(async { + common::PingPong { + clients: common::Clients { + rx: Rx::Recv, + client_cnt: 3, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { rx: Rx::Recv }, + } + .run() + .await; + }); +} + +#[test] +fn net_tcp_ping_pong_recv_bufring() { + // Run 5 clients. Both clients and server use the TcpStream `recv` method with a BufRing pool + // that is built small enough (4 entries) that there will be some pool exhaustion that has to + // be handled by retrying the requests. + // And a bit oddly, both clients and server are using the same BufRing, as they are all run in + // the same tokio_uring instance. + + tokio_uring::start(async { + let buf_ring = bufring::Builder::new(177) + .ring_entries(4) + .buf_len(4096) + // Normally, no reason to skip the auto register, but this let's us test the manual + // register below. + .skip_auto_register(true) + .build() + .unwrap(); + + buf_ring.register().unwrap(); + + common::PingPong { + clients: common::Clients { + rx: Rx::RecvBufRing(buf_ring.clone()), + client_cnt: 3, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { + rx: Rx::RecvBufRing(buf_ring.clone()), + }, + } + .run() + .await; + + // Manually unregistering the buf_ring. When it goes out of scope, it is unregistered + // automatically. Note, it remains in scope while there are outstanding buffers the + // application hasn't dropped yet. + buf_ring.unregister().unwrap(); + }); +} + +#[test] +fn net_tcp_ping_pong_recv_bufring_2_threads() { + // Similar to test net_tcp_ping_pong_recv_bufring above, but uses two new threads, + // one for the server code, one for all the clients. + // + // Two std thread syncing methods employed: a sync_channel gets the ephemeral port from one + // thread back to the main thread, and the main thread then is blocked at the end, waiting for + // the clients thread handle to report the clients thread is done. + // + // There is no attempt to shutdown the server thread. + // + // + // Further details: + // + // The server thread starts a tokio_uring runtime, creates a provided buffers buf_ring, + // and listens on a port, spawning tasks to serve the connections being established. All + // server task share the same provided buffer pool buf_ring. + // + // The client thread also starts a tokio_uring runtime, also creates a provided buffers + // buf_ring, and spawns as many client tasks as the constant below dictates. Each client task + // uses its own Vec buffer for writing data but all share the same buf_ring for receiving + // data back from its stream. + // + // Minutia: + // + // The buffer group id, bgid, assigned to each buf_ring, one for the server, one for the + // clients, are in independant spaces, so could have the same value. They are chosen here as + // 261 and 262, respectively, but they could both be 261. They could both be zero for that + // matter. + + use libc::{sysconf, _SC_PAGESIZE}; + let page_size: usize = unsafe { sysconf(_SC_PAGESIZE) as usize }; + + /* + * These yield a test run that takes about 2.8s + const CLIENT_CNT: usize = 32; + const SENDS_PER_CLIENT: usize = 64; + const SEND_LENGTH: usize = 64 * 1024; + const CLIENT_BUFRING_SIZE: u16 = 64; + const SERVER_BUFRING_SIZE: u16 = 64; + */ + const CLIENT_CNT: usize = 4; + const SENDS_PER_CLIENT: usize = 4; + const SEND_LENGTH: usize = 4 * 1024; + const CLIENT_BUFRING_SIZE: u16 = 8; + const SERVER_BUFRING_SIZE: u16 = 8; + const CLIENT_BUF_LEN: usize = 4096; + const SERVER_BUF_LEN: usize = 4096; + + // Used by the thread running the server to pass its ephemeral local port to the thread + let (addr_tx, addr_rx) = sync_channel::(0); + + let _server_handle = thread::spawn(move || { + tokio_uring::start(async { + let buf_ring = bufring::Builder::new(261) + .page_size(page_size) + .ring_entries(SERVER_BUFRING_SIZE) + .buf_len(SERVER_BUF_LEN) + .build() + .unwrap(); + let server = common::Server { + rx: Rx::RecvBufRing(buf_ring.clone()), + }; + let (listener, local_addr) = common::tcp_listener().await.unwrap(); + addr_tx.send(local_addr).unwrap(); + + common::async_block_ping_pong_listener_loop(server, listener).await; + }); + }); + + let listener_addr = addr_rx.recv().unwrap(); + + let clients_handle = thread::spawn(move || { + tokio_uring::start(async { + let buf_ring = bufring::Builder::new(262) + .page_size(page_size) + .ring_entries(CLIENT_BUFRING_SIZE as u16) + .buf_len(CLIENT_BUF_LEN) + .build() + .unwrap(); + let clients = common::Clients { + rx: Rx::RecvBufRing(buf_ring.clone()), + client_cnt: CLIENT_CNT, + send_cnt: SENDS_PER_CLIENT, + send_length: SEND_LENGTH, + }; + let clients_done = common::ping_pong_clients(clients, listener_addr); + + // Wait for the clients tasks to be done. + + // println!("net.rs:{} now wait for clients to be done", line!()); + let _ = clients_done.await.unwrap(); + // println!("net.rs:{} clients report being done", line!()); + }); + }); + + // Wait for the clients thread to finish. + clients_handle.join().unwrap(); +} From d40d0a7d81c1bdea4458f9244e0ba87da3c9d825 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Tue, 21 Feb 2023 09:26:00 -0500 Subject: [PATCH 07/16] adds unit test support for probing uring feature support --- tests/common/mod.rs | 2 ++ tests/common/probe.rs | 33 +++++++++++++++++++++++++++++++++ tests/net.rs | 9 +++++++++ 3 files changed, 44 insertions(+) create mode 100644 tests/common/probe.rs diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 5d657e54..7354676b 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,3 +1,5 @@ +pub mod probe; + use std::net::SocketAddr; use tokio::sync::oneshot; diff --git a/tests/common/probe.rs b/tests/common/probe.rs new file mode 100644 index 00000000..f9fa5b08 --- /dev/null +++ b/tests/common/probe.rs @@ -0,0 +1,33 @@ +use io_uring::{opcode, IoUring, Probe}; + +pub fn register_probe() -> Option { + let io_uring = match IoUring::new(2) { + Ok(io_uring) => io_uring, + Err(_) => { + return None; + } + }; + let submitter = io_uring.submitter(); + + let mut probe = Probe::new(); + + // register_probe has been available since 5.6. + if submitter.register_probe(&mut probe).is_err() { + return None; + } + return Some(probe); +} + +pub fn is_kernel_minimum_5_19() -> bool { + let Some(register_probe) = register_probe() + else { + return false; + }; + + // IORING_OP_SOCKET was introduced in 5.19. + register_probe.is_supported(opcode::Socket::CODE) +} + +pub fn is_buf_ring_supported() -> bool { + is_kernel_minimum_5_19() +} diff --git a/tests/net.rs b/tests/net.rs index 8b108614..fc8de144 100644 --- a/tests/net.rs +++ b/tests/net.rs @@ -5,6 +5,7 @@ use tokio_uring::buf::bufring; mod common; +use common::probe; use common::Rx; #[test] @@ -66,6 +67,10 @@ fn net_tcp_ping_pong_recv() { #[test] fn net_tcp_ping_pong_recv_bufring() { + if !probe::is_buf_ring_supported() { + eprintln!("skipping test, buf_ring is not supported in this kernel"); + return; + } // Run 5 clients. Both clients and server use the TcpStream `recv` method with a BufRing pool // that is built small enough (4 entries) that there will be some pool exhaustion that has to // be handled by retrying the requests. @@ -107,6 +112,10 @@ fn net_tcp_ping_pong_recv_bufring() { #[test] fn net_tcp_ping_pong_recv_bufring_2_threads() { + if !probe::is_buf_ring_supported() { + eprintln!("skipping test, buf_ring is not supported in this kernel"); + return; + } // Similar to test net_tcp_ping_pong_recv_bufring above, but uses two new threads, // one for the server code, one for all the clients. // From bd09dee64fa4eb7eedab60300055a0f0e080baa0 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Tue, 21 Feb 2023 16:27:23 -0500 Subject: [PATCH 08/16] convert op `recv` to new UnsubmittedOneshot --- src/io/recv.rs | 97 ++++++++++++++++++++----------------------- src/io/socket.rs | 6 +-- src/net/tcp/stream.rs | 2 +- 3 files changed, 50 insertions(+), 55 deletions(-) diff --git a/src/io/recv.rs b/src/io/recv.rs index d67bf52d..6f40c0c2 100644 --- a/src/io/recv.rs +++ b/src/io/recv.rs @@ -1,66 +1,61 @@ -use crate::buf::BoundedBufMut; -use crate::io::SharedFd; -use crate::BufResult; - -use crate::runtime::driver::op::{Completable, CqeResult, Op}; -use crate::runtime::CONTEXT; +use crate::{ + buf::BoundedBufMut, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot, +}; +use io_uring::cqueue::Entry; use std::io; +use std::marker::PhantomData; + +/// An unsubmitted recv operation. +pub type UnsubmittedRecv = UnsubmittedOneshot, RecvTransform>; -pub(crate) struct Recv { +#[allow(missing_docs)] +pub struct RecvData { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, + _fd: SharedFd, - /// Reference to the in-flight buffer. - pub(crate) buf: T, + buf: T, } -impl Op> { - // TODO remove allow - #[allow(dead_code)] - pub(crate) fn recv(fd: &SharedFd, buf: T, flags: Option) -> io::Result>> { - use io_uring::{opcode, types}; - - CONTEXT.with(|x| { - x.handle().expect("Not in a runtime context").submit_op( - Recv { - fd: fd.clone(), - buf, - }, - |s| { - // Get raw buffer info - let ptr = s.buf.stable_mut_ptr(); - let len = s.buf.bytes_total(); - opcode::Recv::new(types::Fd(fd.raw_fd()), ptr, len as _) - .flags(flags.unwrap_or(0)) - .build() - }, - ) - }) - } +#[allow(missing_docs)] +pub struct RecvTransform { + _phantom: PhantomData, } -impl Completable for Recv -where - T: BoundedBufMut, -{ +impl OneshotOutputTransform for RecvTransform { type Output = BufResult; + type StoredData = RecvData; - fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let mut buf = self.buf; + fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { + let res = if cqe.result() >= 0 { + Ok(cqe.result() as usize) + } else { + Err(io::Error::from_raw_os_error(cqe.result())) + }; - // If the operation was successful, advance the initialized cursor. - if let Ok(n) = res { - // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { - buf.set_init(n); - } - } + (res, data.buf) + } +} + +impl UnsubmittedRecv { + pub(crate) fn recv(fd: &SharedFd, mut buf: T, flags: Option) -> Self { + use io_uring::{opcode, types}; - (res, buf) + // Get raw buffer info + let ptr = buf.stable_mut_ptr(); + let len = buf.bytes_init(); + + Self::new( + RecvData { + _fd: fd.clone(), + buf, + }, + RecvTransform { + _phantom: PhantomData::default(), + }, + opcode::Recv::new(types::Fd(fd.raw_fd()), ptr, len as _) + .flags(flags.unwrap_or(0)) + .build(), + ) } } diff --git a/src/io/socket.rs b/src/io/socket.rs index 982d3128..b93812ac 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,3 +1,4 @@ +use crate::io::recv::UnsubmittedRecv; use crate::io::write::UnsubmittedWrite; use crate::runtime::driver::op::Op; use crate::{ @@ -163,9 +164,8 @@ impl Socket { op.await } - pub(crate) async fn recv(&self, buf: T) -> crate::BufResult { - let op = Op::recv(&self.fd, buf, None).unwrap(); - op.await + pub(crate) fn recv(&self, buf: T) -> UnsubmittedRecv { + UnsubmittedOneshot::recv(&self.fd, buf, None) } pub(crate) async fn recv_provbuf( diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 75a9a221..4e8b05ca 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -82,7 +82,7 @@ impl TcpStream { /// /// Returns the original buffer and quantity of data received. pub async fn recv(&self, buf: T) -> crate::BufResult { - self.inner.recv(buf).await + self.inner.recv(buf).submit().await } /// (Experimental: type BufRing and BufX likely to change.) From 2029a2080c87c804afebd9556a2b9650a8c7930b Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Tue, 21 Feb 2023 16:59:16 -0500 Subject: [PATCH 09/16] convert op `recv_provbuf` to new UnsubmittedOneshot --- src/io/recv_provbuf.rs | 88 +++++++++++++++++++++++------------------- src/io/socket.rs | 8 ++-- src/net/tcp/stream.rs | 2 +- 3 files changed, 53 insertions(+), 45 deletions(-) diff --git a/src/io/recv_provbuf.rs b/src/io/recv_provbuf.rs index 37f7e6c2..68b2cd5b 100644 --- a/src/io/recv_provbuf.rs +++ b/src/io/recv_provbuf.rs @@ -1,59 +1,67 @@ -use crate::buf::bufgroup::BufX; -use crate::io::SharedFd; - -use crate::buf::bufring::BufRing; -use crate::runtime::driver::op::{Completable, CqeResult, Op}; -use crate::runtime::CONTEXT; -use io_uring::squeue; +use crate::{ + buf::{bufgroup::BufX, bufring::BufRing}, + io::SharedFd, + OneshotOutputTransform, UnsubmittedOneshot, +}; +use io_uring::{cqueue::Entry, squeue}; use std::io; -pub(crate) struct RecvProvBuf { +/// An unsubmitted recv_provbuf operation. +pub type UnsubmittedRecvProvBuf = UnsubmittedOneshot; + +#[allow(missing_docs)] +pub struct RecvProvBufData { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, + _fd: SharedFd, /// The bufgroup that supplies the bgid and the get_buf function. group: BufRing, } -impl Op { - pub(crate) fn recv_provbuf( - fd: &SharedFd, - group: BufRing, - flags: Option, - ) -> io::Result> { - use io_uring::{opcode, types}; - - CONTEXT.with(|x| { - x.handle().expect("Not in a runtime context").submit_op( - RecvProvBuf { - fd: fd.clone(), - group, - }, - |s| { - // Get raw buffer info - opcode::Recv::new(types::Fd(fd.raw_fd()), std::ptr::null_mut(), 0 as _) - .flags(flags.unwrap_or(0)) - .buf_group(s.group.bgid()) - .build() - .flags(squeue::Flags::BUFFER_SELECT) - }, - ) - }) - } +#[allow(missing_docs)] +pub struct RecvProvBufTransform { + // _phantom: PhantomData, } -impl Completable for RecvProvBuf { +impl OneshotOutputTransform for RecvProvBufTransform { type Output = Result; + type StoredData = RecvProvBufData; - fn complete(self, cqe: CqeResult) -> Self::Output { - let res = cqe.result?; - let flags = cqe.flags; + fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { + let res = if cqe.result() >= 0 { + cqe.result() as u32 + } else { + return Err(io::Error::from_raw_os_error(cqe.result())); + }; + let flags = cqe.flags(); // Safety: getting a buffer from the group requires the res and flags values accurately // indentify a buffer and the length which was written to by the kernel. The res and flags // passed here are those provided by the kernel. - unsafe { self.group.get_buf(res, flags) } + unsafe { data.group.get_buf(res, flags) } + } +} + +impl UnsubmittedRecvProvBuf { + pub(crate) fn recv_provbuf(fd: &SharedFd, group: BufRing, flags: Option) -> Self { + use io_uring::{opcode, types}; + + let bgid = group.bgid(); + + Self::new( + RecvProvBufData { + _fd: fd.clone(), + group, + }, + RecvProvBufTransform { + // _phantom: PhantomData::default(), + }, + opcode::Recv::new(types::Fd(fd.raw_fd()), std::ptr::null_mut(), 0 as _) + .flags(flags.unwrap_or(0)) + .buf_group(bgid) + .build() + .flags(squeue::Flags::BUFFER_SELECT), + ) } } diff --git a/src/io/socket.rs b/src/io/socket.rs index b93812ac..ab4ba6c7 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,4 +1,5 @@ use crate::io::recv::UnsubmittedRecv; +use crate::io::recv_provbuf::UnsubmittedRecvProvBuf; use crate::io::write::UnsubmittedWrite; use crate::runtime::driver::op::Op; use crate::{ @@ -168,12 +169,11 @@ impl Socket { UnsubmittedOneshot::recv(&self.fd, buf, None) } - pub(crate) async fn recv_provbuf( + pub(crate) fn recv_provbuf( &self, group: crate::buf::bufring::BufRing, - ) -> Result { - let op = Op::recv_provbuf(&self.fd, group, None).unwrap(); - op.await + ) -> UnsubmittedRecvProvBuf { + UnsubmittedOneshot::recv_provbuf(&self.fd, group, None) } pub(crate) async fn read_fixed(&self, buf: T) -> crate::BufResult diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 4e8b05ca..932f6c40 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -93,7 +93,7 @@ impl TcpStream { &self, group: crate::buf::bufring::BufRing, ) -> Result { - self.inner.recv_provbuf(group).await + self.inner.recv_provbuf(group).submit().await } /// Read some data from the stream into a registered buffer. From 0f940c06d0624334d3ffbbf61f210e42011b6454 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Tue, 21 Feb 2023 23:30:06 -0500 Subject: [PATCH 10/16] typo --- src/io/recv_provbuf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/recv_provbuf.rs b/src/io/recv_provbuf.rs index 68b2cd5b..89a379e7 100644 --- a/src/io/recv_provbuf.rs +++ b/src/io/recv_provbuf.rs @@ -37,7 +37,7 @@ impl OneshotOutputTransform for RecvProvBufTransform { let flags = cqe.flags(); // Safety: getting a buffer from the group requires the res and flags values accurately - // indentify a buffer and the length which was written to by the kernel. The res and flags + // identify a buffer and the length which was written to by the kernel. The res and flags // passed here are those provided by the kernel. unsafe { data.group.get_buf(res, flags) } } From 9387c92c98138451f7d760432a04b0b95a406f22 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Fri, 24 Feb 2023 12:05:34 -0500 Subject: [PATCH 11/16] handle no-more-data case with a None, rather than an io::Error --- src/buf/bufring/ring.rs | 13 +++++---- src/io/recv_provbuf.rs | 4 +-- src/io/socket.rs | 7 +++-- src/net/tcp/stream.rs | 55 +++++++++++++++++++++++++++++----- tests/common/mod.rs | 65 +++++++++++++++++++++++++++++------------ tests/net.rs | 16 +++++----- 6 files changed, 115 insertions(+), 45 deletions(-) diff --git a/src/buf/bufring/ring.rs b/src/buf/bufring/ring.rs index 03790a6f..0d0f1e81 100644 --- a/src/buf/bufring/ring.rs +++ b/src/buf/bufring/ring.rs @@ -150,13 +150,16 @@ impl BufRing { // The caller is responsible for these being correct. This is expected to be called // when these two values are received from the kernel via a CQE and we rely on the kernel to // give us correct information. - pub(crate) unsafe fn get_buf(&self, res: u32, flags: u32) -> io::Result { + pub(crate) unsafe fn get_buf(&self, res: u32, flags: u32) -> io::Result> { let bid = match io_uring::cqueue::buffer_select(flags) { Some(bid) => bid, None => { - // Have seen res == 0, flags == 4. 4 meaning socket is non empty? - // eprintln!("res {res}, flags {flags}"); - //unreachable!("flags should have the IORING_CQE_F_BUFFER bit"); + // Have seen res == 0, flags == 4 with a TCP socket. res == 0 we take to mean the + // socket is empty so return None to show there is no buffer returned, which should + // be interpreted to mean there is no more data to read from this file or socket. + if res == 0 { + return Ok(None); + } return Err(io::Error::new( io::ErrorKind::Other, @@ -202,7 +205,7 @@ impl BufRing { */ // Safety: the len provided to BufX::new is given to us from the kernel. - Ok(unsafe { BufX::new(self.clone(), bid, len) }) + Ok(Some(unsafe { BufX::new(self.clone(), bid, len) })) } } diff --git a/src/io/recv_provbuf.rs b/src/io/recv_provbuf.rs index 89a379e7..25ceb631 100644 --- a/src/io/recv_provbuf.rs +++ b/src/io/recv_provbuf.rs @@ -25,14 +25,14 @@ pub struct RecvProvBufTransform { } impl OneshotOutputTransform for RecvProvBufTransform { - type Output = Result; + type Output = Result, io::Error>; type StoredData = RecvProvBufData; fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { let res = if cqe.result() >= 0 { cqe.result() as u32 } else { - return Err(io::Error::from_raw_os_error(cqe.result())); + return Err(io::Error::from_raw_os_error(-cqe.result())); }; let flags = cqe.flags(); diff --git a/src/io/socket.rs b/src/io/socket.rs index ab4ba6c7..0dea0f35 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -165,15 +165,16 @@ impl Socket { op.await } - pub(crate) fn recv(&self, buf: T) -> UnsubmittedRecv { - UnsubmittedOneshot::recv(&self.fd, buf, None) + pub(crate) fn recv(&self, buf: T, flags: Option) -> UnsubmittedRecv { + UnsubmittedOneshot::recv(&self.fd, buf, flags) } pub(crate) fn recv_provbuf( &self, group: crate::buf::bufring::BufRing, + flags: Option, ) -> UnsubmittedRecvProvBuf { - UnsubmittedOneshot::recv_provbuf(&self.fd, group, None) + UnsubmittedOneshot::recv_provbuf(&self.fd, group, flags) } pub(crate) async fn read_fixed(&self, buf: T) -> crate::BufResult diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 932f6c40..317f8772 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -4,6 +4,7 @@ use std::{ os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, }; +use crate::buf::{bufgroup::BufX, bufring}; use crate::{ buf::fixed::FixedBuf, buf::{BoundedBuf, BoundedBufMut}, @@ -78,22 +79,60 @@ impl TcpStream { self.inner.read(buf).await } - /// Recv some data from the stream into the buffer. + /// Creates a oneshot recv(2) operation using the provided buffer. /// /// Returns the original buffer and quantity of data received. - pub async fn recv(&self, buf: T) -> crate::BufResult { - self.inner.recv(buf).submit().await + pub async fn recv( + &self, + buf: T, + flags: Option, + ) -> crate::BufResult { + self.inner.recv(buf, flags).submit().await } + /// Creates a oneshot recv(2) operation using the provided buf_ring pool. + /// /// (Experimental: type BufRing and BufX likely to change.) - /// Recv some data from the stream into a buffer picked from the provided buffers. /// - /// Returns the chosen buffer. + /// Returns a Result, io::Error> meaning it returns Ok(None) when there is no more + /// data to read. + /// + /// When the buffer goes out of scope, it is returned to the buf_ring pool. + /// + /// Refer to io_uring_prep_recv(3) for a description of 'flags'. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_uring::buf::bufring; + /// use tokio_uring::net::TcpStream; + /// + /// async fn recv_once( + /// stream: TcpStream, + /// group: bufring::BufRing, + /// flags: Option, + /// ) -> Result<(), std::io::Error> { + /// let bufx = match stream.recv_provbuf(group, flags).await { + /// Ok(Some(bufx)) => bufx, + /// Ok(None) => return Ok(()), + /// Err(e) => return Err(e), + /// }; + /// + /// let read = bufx.len(); + /// if read == 0 { + /// unreachable!(); + /// } + /// + /// // use bufx .. + /// Ok(()) + /// } + /// ``` pub async fn recv_provbuf( &self, - group: crate::buf::bufring::BufRing, - ) -> Result { - self.inner.recv_provbuf(group).submit().await + group: bufring::BufRing, + flags: Option, + ) -> Result, io::Error> { + self.inner.recv_provbuf(group, flags).submit().await } /// Read some data from the stream into a registered buffer. diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7354676b..2c85329e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -10,8 +10,8 @@ use tokio_uring::net::{TcpListener, TcpStream}; #[derive(Clone)] pub enum Rx { Read, - Recv, - RecvBufRing(bufring::BufRing), + Recv(Option), + RecvBufRing(bufring::BufRing, Option), } pub async fn tcp_listener() -> Result<(TcpListener, SocketAddr), std::io::Error> { @@ -24,6 +24,11 @@ pub async fn tcp_listener() -> Result<(TcpListener, SocketAddr), std::io::Error> Ok((listener, socket_addr)) } +#[inline] +pub fn is_no_buffer_space_available(e: &std::io::Error) -> bool { + e.raw_os_error() == Some(105) +} + async fn client_ping_pong(rx: Rx, stream: &TcpStream, send_cnt: usize, send_length: usize) { // Implement client ping-pong loop. Make several read variations available. @@ -33,12 +38,10 @@ async fn client_ping_pong(rx: Rx, stream: &TcpStream, send_cnt: usize, send_leng let (result, buf) = stream.write_all(buf).await; let _result = result.unwrap(); - // println!("client: written: {}", _result); let expect = buf.len(); let mut got: usize = 0; - // println!("client: buf.len {}", buf.len()); let mut buf = buf; while got < expect { let result; @@ -49,16 +52,16 @@ async fn client_ping_pong(rx: Rx, stream: &TcpStream, send_cnt: usize, send_leng (result, buf) = stream.read(buf).await; result } - Rx::Recv => { + Rx::Recv(flags) => { let result; - (result, buf) = stream.recv(buf).await; + (result, buf) = stream.recv(buf, *flags).await; result } - Rx::RecvBufRing(group) => { + Rx::RecvBufRing(group, flags) => { loop { - let buf = stream.recv_provbuf(group.clone()).await; + let buf = stream.recv_provbuf(group.clone(), *flags).await; match buf { - Ok(buf) => { + Ok(Some(buf)) => { // If returning a Vec were necessary: // Either form of conversion from Bufx data to Vec could be appropriate here. // One consumes the BufX, the other doesn't and let's it drop here. @@ -66,8 +69,13 @@ async fn client_ping_pong(rx: Rx, stream: &TcpStream, send_cnt: usize, send_leng // break (Ok(buf.len()), buf.as_slice().to_vec()); break Ok(buf.len()); } + Ok(None) => { + // The connection is closed. Report 0 bytes read. + break Ok(0); + } Err(e) => { // Expected error: No buffer space available (os error 105) + // but sometimes getting error indicating the returned res was 0 // and flags was 4. if e.kind() == std::io::ErrorKind::Other { @@ -77,13 +85,22 @@ async fn client_ping_pong(rx: Rx, stream: &TcpStream, send_cnt: usize, send_leng ); break Err(e); } - eprintln!("client: recv_provbuf error {}", e); + // Normal for some of the tests cases to cause the bufring to be exhuasted. + if !is_no_buffer_space_available(&e) { + panic!("client: recv_provbuf error {}", e); + } } } } } }; let read = result.unwrap(); + if read == 0 { + panic!( + "read of 0 but expected not yet reached, got {}, expected {}", + got, expect + ); + } got += read; // level1-println!("client: read {}", read); // println!("client: read: {:?}", &_buf[..read]); @@ -106,8 +123,8 @@ async fn server_ping_pong_reusing_vec( loop { let (result, nbuf) = match &rx { Rx::Read => stream.read(buf).await, - Rx::Recv => stream.recv(buf).await, - Rx::RecvBufRing(_) => unreachable!(), + Rx::Recv(flags) => stream.recv(buf, *flags).await, + Rx::RecvBufRing(_, _) => unreachable!(), }; buf = nbuf; let read = result.unwrap(); @@ -127,6 +144,7 @@ async fn server_ping_pong_reusing_vec( async fn server_ping_pong_using_buf_ring( stream: TcpStream, group: &bufring::BufRing, + flags: Option, _local_addr: SocketAddr, ) { // Serve the connection by looping on input, each received bufx from the kernel which @@ -142,9 +160,15 @@ async fn server_ping_pong_using_buf_ring( loop { // Loop to allow trying again if there was no buffer space available. let bufx = loop { - let buf = stream.recv_provbuf(group.clone()).await; + let buf = stream.recv_provbuf(group.clone(), flags).await; match buf { - Ok(buf) => break buf, + Ok(Some(buf)) => break buf, + Ok(None) => { + // Normal that the client closed its connection and this + // server sees no more data is forthcoming. So the read + // amount was zero, so there was no buffer picked. + return; + } Err(e) => { // Expected error: No buffer space available (os error 105), // for which we loop around. @@ -154,12 +178,15 @@ async fn server_ping_pong_using_buf_ring( // awaiting confirmation from the io_uring team. if e.kind() == std::io::ErrorKind::Other { eprintln!( - "server: assuming connection is closed: ecv_provbuf error {}", + "server: assuming connection is closed: recv_provbuf error {}", e ); return; } - eprintln!("server: recv_provbuf error {}", e); + // Normal for some of the tests cases to cause the bufring to be exhuasted. + if !is_no_buffer_space_available(&e) { + panic!("server: recv_provbuf error {}", e); + } } } }; @@ -196,12 +223,12 @@ pub async fn async_block_ping_pong_listener_loop(server: Server, listener: TcpLi // Spawn new task for each connnection tokio_uring::spawn(async move { match &rx { - Rx::Read | Rx::Recv => { + Rx::Read | Rx::Recv(_) => { let buf = vec![0u8; 16 * 1024]; server_ping_pong_reusing_vec(rx, stream, buf, socket_addr).await; } - Rx::RecvBufRing(group) => { - server_ping_pong_using_buf_ring(stream, group, socket_addr).await; + Rx::RecvBufRing(group, flags) => { + server_ping_pong_using_buf_ring(stream, group, *flags, socket_addr).await; } }; }); diff --git a/tests/net.rs b/tests/net.rs index fc8de144..32d1d505 100644 --- a/tests/net.rs +++ b/tests/net.rs @@ -53,12 +53,12 @@ fn net_tcp_ping_pong_recv() { tokio_uring::start(async { common::PingPong { clients: common::Clients { - rx: Rx::Recv, + rx: Rx::Recv(None), client_cnt: 3, send_cnt: 10, send_length: 1024, }, - server: common::Server { rx: Rx::Recv }, + server: common::Server { rx: Rx::Recv(None) }, } .run() .await; @@ -66,7 +66,7 @@ fn net_tcp_ping_pong_recv() { } #[test] -fn net_tcp_ping_pong_recv_bufring() { +fn net_tcp_ping_pong_recv_oneshot_bufring() { if !probe::is_buf_ring_supported() { eprintln!("skipping test, buf_ring is not supported in this kernel"); return; @@ -91,13 +91,13 @@ fn net_tcp_ping_pong_recv_bufring() { common::PingPong { clients: common::Clients { - rx: Rx::RecvBufRing(buf_ring.clone()), + rx: Rx::RecvBufRing(buf_ring.clone(), None), client_cnt: 3, send_cnt: 10, send_length: 1024, }, server: common::Server { - rx: Rx::RecvBufRing(buf_ring.clone()), + rx: Rx::RecvBufRing(buf_ring.clone(), None), }, } .run() @@ -111,7 +111,7 @@ fn net_tcp_ping_pong_recv_bufring() { } #[test] -fn net_tcp_ping_pong_recv_bufring_2_threads() { +fn net_tcp_ping_pong_recv_oneshot_bufring_2_threads() { if !probe::is_buf_ring_supported() { eprintln!("skipping test, buf_ring is not supported in this kernel"); return; @@ -175,7 +175,7 @@ fn net_tcp_ping_pong_recv_bufring_2_threads() { .build() .unwrap(); let server = common::Server { - rx: Rx::RecvBufRing(buf_ring.clone()), + rx: Rx::RecvBufRing(buf_ring.clone(), None), }; let (listener, local_addr) = common::tcp_listener().await.unwrap(); addr_tx.send(local_addr).unwrap(); @@ -195,7 +195,7 @@ fn net_tcp_ping_pong_recv_bufring_2_threads() { .build() .unwrap(); let clients = common::Clients { - rx: Rx::RecvBufRing(buf_ring.clone()), + rx: Rx::RecvBufRing(buf_ring.clone(), None), client_cnt: CLIENT_CNT, send_cnt: SENDS_PER_CLIENT, send_length: SEND_LENGTH, From d885aa7365f9cf0f2fa21f59ca8beb8862c66850 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Fri, 24 Feb 2023 12:12:52 -0500 Subject: [PATCH 12/16] fix doc --- src/net/tcp/stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 317f8772..7b4dca2f 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -94,8 +94,8 @@ impl TcpStream { /// /// (Experimental: type BufRing and BufX likely to change.) /// - /// Returns a Result, io::Error> meaning it returns Ok(None) when there is no more - /// data to read. + /// Returns Result\, io::Error> meaning it returns Ok(None) + /// when there is no more data to read. /// /// When the buffer goes out of scope, it is returned to the buf_ring pool. /// From f6eafdbeaefedc4b90f57532ad16903457b70661 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Sun, 26 Feb 2023 09:14:20 -0500 Subject: [PATCH 13/16] fix cqe.result negation in recv.rs too --- src/io/recv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/recv.rs b/src/io/recv.rs index 6f40c0c2..456fdc47 100644 --- a/src/io/recv.rs +++ b/src/io/recv.rs @@ -30,7 +30,7 @@ impl OneshotOutputTransform for RecvTransform { let res = if cqe.result() >= 0 { Ok(cqe.result() as usize) } else { - Err(io::Error::from_raw_os_error(cqe.result())) + Err(io::Error::from_raw_os_error(-cqe.result())) }; (res, data.buf) From be7cdd4912ead4bf31a32799138c5ab8f536c6ae Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Sun, 26 Feb 2023 09:21:14 -0500 Subject: [PATCH 14/16] op: add recv_multi The first operation that supports streaming CQE results. Adds a Streamable trait, along the lines of the Completable trait. Comes with stream_next and stream_complete methods. Adds MultiCQEStream struct, along the lines of the MultiCQEFuture struct. Adds a submit_op_stream along the lines of submit_op. Adds poll_next_op along the lines of poll_op. The Lifecycle gets two additional methods: take_index, and data. take_index: returns the Lifecycle index and replaces it with usize::MAX to show the Lifecycle is no longer represented in the slab. This feature only used by the new poll_next_op. data: returns a reference to the Lifecycle's data, to be able to use it for the Streamable's stream_next calls which only require a reference. Ownership is still transferred in stream_complete. Adds the io/recv_multi operation. The tcp stream recv_multi is a function that bridges the private types with a public function. There is no cancel yet for the multishot command but the code can be written to break out of the loop. Also, when the connection is closed, the command should fail. It's not tested, but unregistering the buf_ring might cancel the command too - but maybe not. Unit tests: net unit tests and helper functions return io::Return add recv_multi cases add BufRingProps to net unit tests - to quiet empty buffer warnings which are intentional in some tests --- Cargo.toml | 3 + src/io/mod.rs | 2 + src/io/recv_multi.rs | 87 +++++++ src/io/socket.rs | 9 + src/net/tcp/stream.rs | 59 +++++ src/runtime/driver/handle.rs | 25 +- src/runtime/driver/mod.rs | 127 +++++++++- src/runtime/driver/op/mod.rs | 55 +++- tests/common/mod.rs | 473 +++++++++++++++++++++++++++-------- tests/net.rs | 283 ++++++++++++++++++--- 10 files changed, 986 insertions(+), 137 deletions(-) create mode 100644 src/io/recv_multi.rs diff --git a/Cargo.toml b/Cargo.toml index 8812537f..96523497 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,12 +18,15 @@ keywords = ["async", "fs", "io-uring"] [dependencies] tokio = { version = "1.2", features = ["net", "rt", "sync"] } +tokio-stream = "0.1.12" slab = "0.4.2" libc = "0.2.80" io-uring = "0.5.13" socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } +async-stream = "0.3.4" futures = "0.3.26" +futures-core = "0.3.26" [dev-dependencies] tempfile = "3.2.0" diff --git a/src/io/mod.rs b/src/io/mod.rs index 8fe4b03d..8056a43a 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -25,6 +25,8 @@ mod recv; mod recv_from; +mod recv_multi; + mod recv_provbuf; mod rename_at; diff --git a/src/io/recv_multi.rs b/src/io/recv_multi.rs new file mode 100644 index 00000000..564828bc --- /dev/null +++ b/src/io/recv_multi.rs @@ -0,0 +1,87 @@ +use crate::buf::bufgroup::BufX; +use crate::io::SharedFd; + +use crate::buf::bufring::BufRing; +use crate::runtime::driver::op::{CqeResult, MultiCQEStream, Op, Streamable}; +use crate::runtime::CONTEXT; +use std::io; + +pub(crate) type RecvMultiStream = Op; + +pub(crate) struct RecvMulti { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + _fd: SharedFd, + + /// The bufgroup that supplies the bgid and the get_buf function. + group: BufRing, +} + +impl RecvMultiStream { + pub(crate) fn recv_multi( + fd: &SharedFd, + group: BufRing, + flags: Option, + ) -> io::Result { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle() + .expect("Not in a runtime context") + .submit_op_stream( + RecvMulti { + _fd: fd.clone(), + group, + }, + |s| { + opcode::RecvMulti::new(types::Fd(fd.raw_fd()), s.group.bgid()) + .flags(flags.unwrap_or(0)) + .build() + }, + ) + }) + } +} + +impl Streamable for RecvMulti { + type Item = io::Result; + + fn stream_next(&self, cqe: CqeResult) -> Self::Item { + // The kernel is not expected to provide an error when the `more` bit was set, + // but it's just as easy to handle the case here. + let res = cqe.result?; + let flags = cqe.flags; + + // Safety: getting a buffer from the group requires the res and flags values accurately + // identify a buffer and the length which was written to by the kernel. The res and flags + // passed here are those provided by the kernel. + let get_buf_res = unsafe { self.group.get_buf(res, flags) }; + + match get_buf_res { + Ok(Some(bufx)) => Ok(bufx), + Ok(None) => { + unreachable!("unexpected combo of 'more', no error, and no buffer"); + } + Err(e) => Err(e), + } + } + + fn stream_complete(self, cqe: CqeResult) -> Option { + let res = match cqe.result { + Ok(res) => res, + Err(e) => return Some(Err(e)), + }; + let flags = cqe.flags; + + // Safety: getting a buffer from the group requires the res and flags values accurately + // identify a buffer and the length which was written to by the kernel. The res and flags + // passed here are those provided by the kernel. + let get_buf_res = unsafe { self.group.get_buf(res, flags) }; + + match get_buf_res { + Ok(Some(bufx)) => Some(Ok(bufx)), + Ok(None) => None, + Err(e) => Some(Err(e)), + } + } +} diff --git a/src/io/socket.rs b/src/io/socket.rs index 0dea0f35..ef88c5fc 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,4 +1,5 @@ use crate::io::recv::UnsubmittedRecv; +use crate::io::recv_multi::RecvMultiStream; use crate::io::recv_provbuf::UnsubmittedRecvProvBuf; use crate::io::write::UnsubmittedWrite; use crate::runtime::driver::op::Op; @@ -169,6 +170,14 @@ impl Socket { UnsubmittedOneshot::recv(&self.fd, buf, flags) } + pub(crate) fn recv_multi( + &self, + group: crate::buf::bufring::BufRing, + flags: Option, + ) -> RecvMultiStream { + Op::recv_multi(&self.fd, group, flags).unwrap() + } + pub(crate) fn recv_provbuf( &self, group: crate::buf::bufring::BufRing, diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 7b4dca2f..a6d30b88 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -11,7 +11,9 @@ use crate::{ io::{SharedFd, Socket}, UnsubmittedWrite, }; +use futures_core::Stream; +/// /// A TCP stream between a local and a remote socket. /// /// A TCP stream can either be created by connecting to an endpoint, via the @@ -135,6 +137,63 @@ impl TcpStream { self.inner.recv_provbuf(group, flags).submit().await } + /// Creates a streaming multishot recv(2) operation. + /// + /// Returns a stream from which next().await can be called, returning + /// a Result. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_stream::StreamExt; + /// use tokio_uring::buf::bufring; + /// use tokio_uring::net::TcpStream; + /// + /// async fn recv_multi( + /// stream: TcpStream, + /// group: bufring::BufRing, + /// flags: Option, + /// ) { + /// let buffers = stream.recv_multi(group, flags); + /// tokio::pin!(buffers); + /// + /// while let Some(b) = buffers.next().await { + /// let bufx = match b { + /// Ok(bufx) => bufx, + /// Err(e) => break, + /// }; + /// + /// let read = bufx.len(); + /// if read == 0 { + /// unreachable!(); + /// } + /// + /// // use bufx .. + /// } + /// } + /// ``` + // TODO: after the rewrite using the UnsubmittedXY API, see if this + // can simplified to return a public type that supports poll_next + // without having to resort to the stream! macro. + // For now, it's good enough. Eventually we want recv_multi support for all + // the socket types. + pub fn recv_multi( + &self, + group: bufring::BufRing, + flags: Option, + ) -> impl Stream> + '_ { + use async_stream::stream; + use tokio::pin; + use tokio_stream::StreamExt; + stream! { + let s = self.inner.recv_multi(group, flags); + pin!(s); + while let Some(result) = s.next().await { + yield result + } + } + } + /// Read some data from the stream into a registered buffer. /// /// Like [`read`], but using a pre-mapped buffer diff --git a/src/runtime/driver/handle.rs b/src/runtime/driver/handle.rs index 9a89fd1b..2585e8ee 100644 --- a/src/runtime/driver/handle.rs +++ b/src/runtime/driver/handle.rs @@ -21,7 +21,9 @@ use std::rc::{Rc, Weak}; use std::task::{Context, Poll}; use crate::buf::fixed::FixedBuffers; -use crate::runtime::driver::op::{Completable, MultiCQEFuture, Op, Updateable}; +use crate::runtime::driver::op::{ + Completable, MultiCQEFuture, MultiCQEStream, Op, Streamable, Updateable, +}; use crate::runtime::driver::Driver; #[derive(Clone)] @@ -90,6 +92,16 @@ impl Handle { self.inner.borrow_mut().submit_op(data, f, self.into()) } + pub(crate) fn submit_op_stream(&self, data: T, f: F) -> io::Result> + where + T: Streamable, + F: FnOnce(&mut T) -> squeue::Entry, + { + self.inner + .borrow_mut() + .submit_op_stream(data, f, self.into()) + } + pub(crate) fn poll_op(&self, op: &mut Op, cx: &mut Context<'_>) -> Poll where T: Unpin + 'static + Completable, @@ -112,6 +124,17 @@ impl Handle { self.inner.borrow_mut().poll_multishot_op(op, cx) } + pub(crate) fn poll_next_op( + &self, + op: &mut Op, + cx: &mut Context<'_>, + ) -> Poll> + where + T: Unpin + 'static + Streamable, + { + self.inner.borrow_mut().poll_next_op(op, cx) + } + pub(crate) fn remove_op(&self, op: &mut Op) { self.inner.borrow_mut().remove_op(op) } diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index eaa83427..e8878ae0 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -1,5 +1,7 @@ use crate::buf::fixed::FixedBuffers; -use crate::runtime::driver::op::{Completable, Lifecycle, MultiCQEFuture, Op, Updateable}; +use crate::runtime::driver::op::{ + Completable, Lifecycle, MultiCQEFuture, MultiCQEStream, Op, Streamable, Updateable, +}; use io_uring::opcode::AsyncCancel; use io_uring::{cqueue, squeue, IoUring}; use slab::Slab; @@ -188,6 +190,33 @@ impl Driver { Ok(op) } + pub(crate) fn submit_op_stream( + &mut self, + mut data: T, + f: F, + handle: WeakHandle, + ) -> io::Result> + where + T: Streamable, + F: FnOnce(&mut T) -> squeue::Entry, + { + let index = self.ops.insert(); + + // Configure the SQE + let sqe = f(&mut data).user_data(index as _); + + // Create the operation + let op = Op::new(handle, data, index); + + // Push the new operation + while unsafe { self.uring.submission().push(&sqe).is_err() } { + // If the submission queue is full, flush it to the kernel + self.submit()?; + } + + Ok(op) + } + pub(crate) fn remove_op(&mut self, op: &mut Op) { // Get the Op Lifecycle state from the driver let (lifecycle, completions) = match self.ops.get_mut(op.index()) { @@ -379,6 +408,102 @@ impl Driver { } } } + + pub(crate) fn poll_next_op( + &mut self, + op: &mut Op, + cx: &mut Context<'_>, + ) -> Poll> + where + T: Unpin + 'static + Streamable, + { + // The multishot operations result in zero or `more` CQE being received with no error + // possible. And the last (no `more`) CQE will be received when the operation is finished: + // either with or without an error. + + // One source of Poll::Ready(None) is if this stream has already seen the final CQE in an + // earlier poll round, where the op.index was taken and replaced with usize::MAX. + let Some((lifecycle, completions)) = self + .ops + .get_mut(op.index()) + else { + return Poll::Ready(None); + }; + + match mem::replace(lifecycle, Lifecycle::Submitted) { + Lifecycle::Submitted => { + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + Poll::Pending + } + Lifecycle::Waiting(waker) => { + if waker.will_wake(cx.waker()) { + *lifecycle = Lifecycle::Waiting(waker); + } else { + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + } + Poll::Pending + } + Lifecycle::CompletionList(indices) => { + let mut list = indices.into_list(completions); + + let item = list.pop(); + + match item { + Some(cqe) => { + if list.is_empty() { + // Empty list now. Leave Lifecycle newly set to Submitted. + // Forgetting list lets us mutably borrow self.ops again below. + std::mem::forget(list); + } else { + // Update the Lifecycle with the shortened list. + *lifecycle = Lifecycle::CompletionList(list.into_indices()); + } + + if cqueue::more(cqe.flags) { + // Not the last CQE for this operation. + // Borrow op.data to call its stream_next. + match op.data() { + Some(data) => Poll::Ready(Some(data.stream_next(cqe))), + None => { + // No data also means no index, so this state is unreachable. + // The Poll::None would have already been returned at the top + // of this function. + unreachable!("Streaming CompletionList/data-None case"); + } + } + } else { + // The first of two ways the op is removed from the slab. + // Duplicate logic below. Also more details below. + let index = op.take_index(); + self.ops.remove(index); + let data = op.take_data().unwrap(); + Poll::Ready(data.stream_complete(cqe)) + } + } + None => { + // List was already empty. This poll function makes this state impossible. + unreachable!("Streaming CompletionList empty case"); + } + } + } + Lifecycle::Completed(_entry) => { + // The second of two ways the op is removed from the slab. + // Duplicate logic above. + + // Take the index, leaving a value that will be out of bounds so on the next + // poll of the steam, None can be returned. + // And take the data, passing ownership to stream_complete. + // + // stream_complete() is interesting. It can return None or Some(item) so this is + // also a place with Poll::Ready(None) might be returned. + let index = op.take_index(); + self.ops.remove(index); + let data = op.take_data().unwrap(); + Poll::Ready(data.stream_complete(_entry.into())) + } + Lifecycle::Ignored(..) => unreachable!(), + } + } } impl AsRawFd for Driver { diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 32ba3e7a..1421d7f6 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -3,6 +3,7 @@ use std::io; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll, Waker}; +use tokio_stream::Stream; use io_uring::{cqueue, squeue}; @@ -140,13 +141,28 @@ pub(crate) struct Op { /// A Marker for Ops which expect only a single completion event pub(crate) struct SingleCQE; -/// A Marker for Operations will process multiple completion events, +/// A Marker for Operations that will process multiple completion events, /// which combined resolve to a single Future value pub(crate) struct MultiCQEFuture; +/// A Marker for Operations that will process multiple completion events, +/// each resolving in a streamed value. +/// +/// The semantics of such a stream are: +/// +/// While 'more' CQE are received, they are by definition, successful so the poll_next will return +/// Some(Ok(item)). When the last CQE is received, the 'more' bit is no longer set, the stream's +/// poll_next will return None, Some(Ok(item)) or Some(Err(e)), depending on the CQE result and +/// flags values, after which additional poll_next calls will return None. +pub(crate) struct MultiCQEStream; + +// TODO make the MultiCQEStream and the MultiCQEFuture cancel safe. At least in terms of getting +// more-link entries removed from the slab so the drop of the driver does not hang. +// Probably wait to do this until after the new Unsubmitted* API for the `more` cases is in place. + pub(crate) trait Completable { type Output; - /// `complete` will be called for cqe's do not have the `more` flag set + /// `complete` will be called for cqe's which do not have the `more` flag set fn complete(self, cqe: CqeResult) -> Self::Output; } @@ -156,6 +172,17 @@ pub(crate) trait Updateable: Completable { fn update(&mut self, cqe: CqeResult); } +pub(crate) trait Streamable { + type Item; + + /// `stream_next` will be called for cqe's which have the `more` flag set. + fn stream_next(&self, cqe: CqeResult) -> Self::Item; + + /// `stream_complete` will be called for cqe's which do not have the `more` flag set and will + /// return None or Some(Self::Item). + fn stream_complete(self, cqe: CqeResult) -> Option; +} + pub(crate) enum Lifecycle { /// The operation has been submitted to uring and is currently in-flight Submitted, @@ -209,10 +236,20 @@ impl Op { self.index } + pub(super) fn take_index(&mut self) -> usize { + let index = self.index; + self.index = usize::MAX; + index + } + pub(super) fn take_data(&mut self) -> Option { self.data.take() } + pub(super) fn data(&self) -> Option<&T> { + self.data.as_ref() + } + pub(super) fn insert_data(&mut self, data: T) { self.data = Some(data); } @@ -246,6 +283,20 @@ where } } +impl Stream for Op +where + T: Unpin + 'static + Streamable, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.driver + .upgrade() + .expect("Not in runtime context") + .poll_next_op(self.get_mut(), cx) + } +} + /// The operation may have pending cqe's not yet processed. /// To manage this, the lifecycle associated with the Op may if required /// be placed in LifeCycle::Ignored state to handle cqe's which arrive after diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 2c85329e..413fc5d5 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -2,16 +2,38 @@ pub mod probe; use std::net::SocketAddr; +use tokio::io::{self, Error, ErrorKind}; use tokio::sync::oneshot; use tokio::task::JoinSet; +use tokio::time; +use tokio_stream::StreamExt; use tokio_uring::buf::bufring; use tokio_uring::net::{TcpListener, TcpStream}; #[derive(Clone)] pub enum Rx { - Read, - Recv(Option), - RecvBufRing(bufring::BufRing, Option), + Read(ReadProps), + Recv(RecvProps), + RecvBufRing(BufRingProps), + RecvMulti(BufRingProps), +} + +#[derive(Clone)] +pub struct ReadProps { + pub buf_size: usize, +} + +#[derive(Clone)] +pub struct RecvProps { + pub buf_size: usize, + pub flags: Option, +} + +#[derive(Clone)] +pub struct BufRingProps { + pub buf_ring: bufring::BufRing, + pub flags: Option, + pub quiet_overflow: bool, } pub async fn tcp_listener() -> Result<(TcpListener, SocketAddr), std::io::Error> { @@ -28,124 +50,255 @@ pub async fn tcp_listener() -> Result<(TcpListener, SocketAddr), std::io::Error> pub fn is_no_buffer_space_available(e: &std::io::Error) -> bool { e.raw_os_error() == Some(105) } +#[inline] +pub fn is_connection_reset_by_peer(e: &std::io::Error) -> bool { + e.raw_os_error() == Some(104) +} +#[inline] +pub fn is_broken_pipe(e: &std::io::Error) -> bool { + e.raw_os_error() == Some(32) +} + +async fn _client_ping_pong_recv_multi( + stream: TcpStream, + send_cnt: usize, + send_buf: Vec, + props: &BufRingProps, +) -> io::Result<()> { + // Send one recv_multi, and then some number of write_all operations. + let BufRingProps { + buf_ring, + flags, + quiet_overflow, + } = props; + let mut send_buf = send_buf; + let mut sent: usize = 0; + let expect = send_buf.len(); + let mut got: usize = expect; + + // Invert the normal loop counters in order to perform the multishot recv at the top. + while sent < send_cnt || got < expect { + let buffers = stream.recv_multi(buf_ring.clone(), *flags); + tokio::pin!(buffers); + + 'new_stream: while sent < send_cnt || got < expect { + if got > expect { + return Err(Error::new( + ErrorKind::Other, + format!("client read got {got}, more than expected of {expect}"), + )); + } + if got == expect { + got = 0; + sent += 1; + let result; + (result, send_buf) = stream.write_all(send_buf).await; + let _ = result?; + } + + // Keep receiving until we've gotten enough. + while let Some(b) = buffers.next().await { + let bufx = match b { + Ok(buf) => buf, + Err(e) => { + if is_no_buffer_space_available(&e) { + if !quiet_overflow { + println!("client: sleep 1us, recoverable recv_multi error {}", e); + } + time::sleep(time::Duration::from_micros(1)).await; + // Have a stream restarted with a new read. + break 'new_stream; + } + return Err(Error::new(ErrorKind::Other, + format!("client with send {sent}, got {got}, expect {expect}, sees recv_multi error {e}"))); + } + }; + + let read = bufx.len(); + got += read; + if got >= expect { + continue 'new_stream; + } + } + // The buffer stream returned None, indicating the command finished with result (length) 0. + println!("line {}: buffer stream ended: client has sent {sent} packets and got {got} bytes bytes while expecting {expect}", line!()); + break 'new_stream; + } + if sent < send_cnt || got < expect { + // Don't print when we've reached the limit? + // These tests could be structured differently ... + println!("line {}: client has sent {sent} packets and got {got} bytes bytes while expecting {expect}", line!()); + } + + // TODO work on cancel of the `buffers` stream in-flight command. If we drop the stream + // without it having completed the in-flight command an* we expect to be able to read + // from the file descriptor again, we have to have the first in-flight cancelled first. In + // fact, it seems data can be lost, and a bufring bid can be lost track of, if not + // explicitly waiting for a cancel or close of the buffers stream. There is no outstanding + // BufX created, but the bid will have sat in a CQE that didn't get mapped back to the + // stream future to handle, so will have been disgarded. Another outstanding problem with + // dropped streams is one or multiple more-links may be left in the slab. All these are + // related to having the ability to cancel an in-flight operation where the op itself has + // been dropped and the data it kept has been transferred to the slab's lifecycle entry. + } + Ok(()) +} -async fn client_ping_pong(rx: Rx, stream: &TcpStream, send_cnt: usize, send_length: usize) { +async fn _client_ping_pong( + rx: Rx, + stream: TcpStream, + send_cnt: usize, + send_length: usize, +) -> io::Result<()> { // Implement client ping-pong loop. Make several read variations available. - for _ in 0..send_cnt { - // Make this vector longer to cause splits in round trip transmission. - let buf = vec![1u8; send_length]; + let mut send_buf = vec![1u8; send_length]; + let expect = send_buf.len(); - let (result, buf) = stream.write_all(buf).await; - let _result = result.unwrap(); + // Handle the recv_multi option separately because the stream created requires being pinned and + // I couldn't figure out how to unpin it again to put it back into the Some(buffers) form. + match &rx { + Rx::RecvMulti(props) => { + return _client_ping_pong_recv_multi(stream, send_cnt, send_buf, props).await + } + _ => {} + }; + + // Only used for two of the cases below, but logically, it's easier to create once even if it + // won't be used. + let recv_buf_length = match &rx { + Rx::Read(props) => { + let ReadProps { buf_size } = props; + *buf_size + } + Rx::Recv(props) => { + let RecvProps { buf_size, flags: _ } = props; + *buf_size + } + _ => 0, + }; + let mut recv_buf = vec![0u8; recv_buf_length]; + + for _ in 0..send_cnt { + let result; + (result, send_buf) = stream.write_all(send_buf).await; + let _ = result.unwrap(); - let expect = buf.len(); let mut got: usize = 0; - let mut buf = buf; while got < expect { - let result; - - result = match &rx { - Rx::Read => { + match &rx { + Rx::Read(_) => { let result; - (result, buf) = stream.read(buf).await; - result + (result, recv_buf) = stream.read(recv_buf).await; + let read = result.unwrap(); + if read == 0 { + return Err(Error::new(ErrorKind::Other, format!("client read returned 0, but got had reached only {}, while expected is {}", got, expect))); + } + got += read; } - Rx::Recv(flags) => { + Rx::Recv(props) => { let result; - (result, buf) = stream.recv(buf, *flags).await; - result + (result, recv_buf) = stream.recv(recv_buf, props.flags).await; + let read = result.unwrap(); + if read == 0 { + return Err(Error::new(ErrorKind::Other, format!("client read returned 0, but got had reached only {}, while expected is {}", got, expect))); + } + got += read; } - Rx::RecvBufRing(group, flags) => { + Rx::RecvBufRing(props) => { + let BufRingProps { + buf_ring, + flags, + quiet_overflow, + } = props; + // Loop to handle case where the buf_ring pool was exhausted. loop { - let buf = stream.recv_provbuf(group.clone(), *flags).await; - match buf { - Ok(Some(buf)) => { + let bufx = stream.recv_provbuf(buf_ring.clone(), *flags).await; + match bufx { + Ok(Some(bufx)) => { // If returning a Vec were necessary: // Either form of conversion from Bufx data to Vec could be appropriate here. // One consumes the BufX, the other doesn't and let's it drop here. - // break (Ok(buf.len()), buf.into()) - // break (Ok(buf.len()), buf.as_slice().to_vec()); - break Ok(buf.len()); + // break (Ok(bufx.len()), bufx.into()) + // break (Ok(bufx.len()), bufx.as_slice().to_vec()); + got += bufx.len(); + break; } Ok(None) => { - // The connection is closed. Report 0 bytes read. - break Ok(0); + // The connection is closed. But we are short of expected. + return Err(Error::new(ErrorKind::Other, format!("client read returned 0, but got had reached only {}, while expected is {}", got, expect))); } Err(e) => { - // Expected error: No buffer space available (os error 105) - - // but sometimes getting error indicating the returned res was 0 - // and flags was 4. - if e.kind() == std::io::ErrorKind::Other { - eprintln!( - "client: assuming connection is closed: ecv_provbuf error {}", - e - ); - break Err(e); - } - // Normal for some of the tests cases to cause the bufring to be exhuasted. - if !is_no_buffer_space_available(&e) { - panic!("client: recv_provbuf error {}", e); + // Normal for some of the tests cases to cause the bufring to be exhausted. + if is_no_buffer_space_available(&e) { + if !quiet_overflow { + println!( + "client: sleep 1us, recoverable recv_provbuf error {}", + e + ); + } + time::sleep(time::Duration::from_micros(1)).await; + continue; } + return Err( + Error::new(ErrorKind::Other, + format!("client with got {got}, expect {expect}, sees recv_provbuf error {e}"))); } } } } - }; - let read = result.unwrap(); - if read == 0 { - panic!( - "read of 0 but expected not yet reached, got {}, expected {}", - got, expect - ); + Rx::RecvMulti(_) => { + // This case was handled earlier in the function. + unreachable!(); + } } - got += read; - // level1-println!("client: read {}", read); - // println!("client: read: {:?}", &_buf[..read]); } } + Ok(()) } -async fn server_ping_pong_reusing_vec( - rx: Rx, - stream: TcpStream, - buf: Vec, - _local_addr: SocketAddr, -) { +async fn _server_ping_pong_reusing_vec(rx: Rx, stream: TcpStream, _local_addr: SocketAddr) { use tokio_uring::buf::BoundedBuf; // for slice() - let mut buf = buf; - // level1-println!("server: {} connected", peer); + let recv_buf_length = match &rx { + Rx::Read(props) => { + let ReadProps { buf_size } = props; + *buf_size + } + Rx::Recv(props) => { + let RecvProps { buf_size, flags: _ } = props; + *buf_size + } + _ => 0, + }; + let mut buf = vec![0u8; recv_buf_length]; let mut _n = 0; loop { let (result, nbuf) = match &rx { - Rx::Read => stream.read(buf).await, - Rx::Recv(flags) => stream.recv(buf, *flags).await, - Rx::RecvBufRing(_, _) => unreachable!(), + Rx::Read(_) => stream.read(buf).await, + Rx::Recv(props) => stream.recv(buf, props.flags).await, + Rx::RecvBufRing(_) => unreachable!(), + Rx::RecvMulti(_) => unreachable!(), }; buf = nbuf; let read = result.unwrap(); if read == 0 { - // level1-println!("server: {} closed, {} total ping-ponged", peer, _n); break; } let (res, slice) = stream.write_all(buf.slice(..read)).await; let _ = res.unwrap(); buf = slice.into_inner(); - // level1-println!("server: {} all {} bytes ping-ponged", peer, read); _n += read; } } -async fn server_ping_pong_using_buf_ring( +async fn _server_ping_pong_using_recv_bufring_oneshot( stream: TcpStream, - group: &bufring::BufRing, - flags: Option, _local_addr: SocketAddr, + props: &BufRingProps, ) { // Serve the connection by looping on input, each received bufx from the kernel which // we let go out of scope when we are done so it can be given back to the kernel. @@ -155,12 +308,17 @@ async fn server_ping_pong_using_buf_ring( // where the buffer being written to by the kernel is picked by the kernel from a // provided buffer pool, and when finished working with the buffer, it is returned to // the kernel's provided buffer pool. + let BufRingProps { + buf_ring, + flags, + quiet_overflow, + } = props; let mut _n = 0; loop { // Loop to allow trying again if there was no buffer space available. let bufx = loop { - let buf = stream.recv_provbuf(group.clone(), flags).await; + let buf = stream.recv_provbuf(buf_ring.clone(), *flags).await; match buf { Ok(Some(buf)) => break buf, Ok(None) => { @@ -177,16 +335,20 @@ async fn server_ping_pong_using_buf_ring( // and flags was 4. Treat this like the connection is closed while // awaiting confirmation from the io_uring team. if e.kind() == std::io::ErrorKind::Other { - eprintln!( + println!( "server: assuming connection is closed: recv_provbuf error {}", e ); return; } - // Normal for some of the tests cases to cause the bufring to be exhuasted. + // Normal for some of the tests cases to cause the bufring to be exhausted. if !is_no_buffer_space_available(&e) { panic!("server: recv_provbuf error {}", e); } + if !quiet_overflow { + println!("server: sleep 1us, recoverable recv_provbuf error {}", e); + } + time::sleep(time::Duration::from_micros(1)).await; } } }; @@ -209,44 +371,129 @@ async fn server_ping_pong_using_buf_ring( let (res, _) = stream.write_all(bufx).await; let _ = res.unwrap(); - // level1-println!("server: {} all {} bytes ping-ponged with bufx", peer, read); _n += read; } } -pub async fn async_block_ping_pong_listener_loop(server: Server, listener: TcpListener) { - let Server { rx } = server; +async fn _server_ping_pong_using_recv_multishot( + stream: TcpStream, + _local_addr: SocketAddr, + props: &BufRingProps, +) { + // Serve the connection by looping on input, each received bufx from the kernel which + // we let go out of scope when we are done so it can be given back to the kernel. + // + // Here is a completion model based loop, as described in + // https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023 + // where the buffer being written to by the kernel is picked by the kernel from a + // provided buffer pool, and when finished working with the buffer, it is returned to + // the kernel's provided buffer pool. + let BufRingProps { + buf_ring, + flags, + quiet_overflow, + } = props; + + let mut total = 0; + + loop { + let buffers = stream.recv_multi(buf_ring.clone(), *flags); + tokio::pin!(buffers); + while let Some(buf) = buffers.next().await { + let bufx = match buf { + Ok(buf) => buf, + Err(e) => { + if is_no_buffer_space_available(&e) { + if !quiet_overflow { + println!("server: sleep 1us, recoverable recv_multi error {}", e); + } + time::sleep(time::Duration::from_micros(1)).await; + break; + } + if is_connection_reset_by_peer(&e) { + // This seems to be a normal condition that is sometimes caught, sometimes + // not. If the clients all end quickly, taking down their connection + // endpoints and the test can then complete, the fact that some or all of + // the server tasks see their connections reset by peer can go unreported. + println!("server: ending task after reading {total} bytes due to recv_multi error {}", e); + return; + } + panic!( + "server: after reading {total} bytes, fatal recv_multi error {}", + e + ); + } + }; + + let read = bufx.len(); + + // The stream should not provided buffers with zero length. A zero length CQE result is + // expected to mean the file descriptor will have no more data to read. + assert!(read > 0); + total += read; + + // Writing bufx or bufx.slice(..read) works equally well as the bufx length *is* + // the length that was read. + // let (res, _) = stream.write_all(bufx.slice(..read)).await; + // let (res, _) = stream.write_all(bufx).await; + // + // The returned _ represents the BufX or slice of the BufX which we let go out of scope. + + let (res, _) = stream.write_all(bufx).await; + + match res { + Ok(()) => {} + Err(e) => { + if is_broken_pipe(&e) { + println!("server: ending task after reading {total} bytes due to sometimes normal write_all error {}", e); + } else { + println!("server: ending task after reading {total} bytes due to unexpected write_all error {}", e); + } + return; + } + } + } + } +} + +pub async fn ping_pong_listener_loop(server: Server, listener: TcpListener) { + let Server { rx, nodelay } = server; loop { let (stream, socket_addr) = listener.accept().await.unwrap(); + + if nodelay { + let _ = stream.set_nodelay(true).unwrap(); + } + let rx = rx.clone(); - // Spawn new task for each connnection + // Spawn task for each accepted connnection. tokio_uring::spawn(async move { match &rx { - Rx::Read | Rx::Recv(_) => { - let buf = vec![0u8; 16 * 1024]; - server_ping_pong_reusing_vec(rx, stream, buf, socket_addr).await; + Rx::Read(_) | Rx::Recv(_) => { + _server_ping_pong_reusing_vec(rx, stream, socket_addr).await; + } + Rx::RecvBufRing(props) => { + _server_ping_pong_using_recv_bufring_oneshot(stream, socket_addr, props).await; } - Rx::RecvBufRing(group, flags) => { - server_ping_pong_using_buf_ring(stream, group, *flags, socket_addr).await; + Rx::RecvMulti(props) => { + _server_ping_pong_using_recv_multishot(stream, socket_addr, props).await; } }; }); } } -fn spawn_ping_pong_listener_loop(server: Server, listener: TcpListener) { - tokio_uring::spawn(async move { - async_block_ping_pong_listener_loop(server, listener).await; - }); -} - -pub fn ping_pong_clients(clients: Clients, listener_addr: SocketAddr) -> oneshot::Receiver<()> { +pub fn ping_pong_clients( + clients: Clients, + listener_addr: SocketAddr, +) -> oneshot::Receiver> { // Spawn clients as tokio_uring tasks and return a tokio oneshot receiver // that will indicate when they are done. let Clients { rx, + nodelay, client_cnt, send_cnt, send_length, @@ -260,31 +507,52 @@ pub fn ping_pong_clients(clients: Clients, listener_addr: SocketAddr) -> oneshot let rx = rx.clone(); set.spawn_local(async move { let stream = TcpStream::connect(listener_addr).await.unwrap(); - client_ping_pong(rx, &stream, send_cnt, send_length).await; + if nodelay { + let _ = stream.set_nodelay(true).unwrap(); + } - client_id // return through handle + let res = _client_ping_pong(rx, stream, send_cnt, send_length).await; + (client_id, res) // return through handle }); } - let (tx, rx) = oneshot::channel::<()>(); + let (tx, rx) = oneshot::channel::>(); tokio_uring::spawn(async move { + let mut failed: usize = 0; let mut seen = vec![false; client_cnt]; while let Some(res) = set.join_next().await { - let client_id = res.unwrap(); + let (client_id, res) = res.unwrap(); + match res { + Ok(()) => { + println!("client {client_id} succeeded"); + } + Err(e) => { + failed += 1; + println!("client {client_id} failed: {e}"); + } + } seen[client_id] = true; } for i in 0..client_cnt { assert!(seen[i]); } - let _ = tx.send(()).unwrap(); + let res = if failed == 0 { + Ok(()) + } else { + Err(Error::new( + ErrorKind::Other, + format!("{failed} client(s) failed"), + )) + }; + let _ = tx.send(res).unwrap(); }); rx } -async fn _ping_pong(clients: Clients, server: Server) { +async fn _ping_pong(clients: Clients, server: Server) -> io::Result<()> { // Run `client_cnt` clients. Both clients and server use the TcpStream method identified by `rx`. let (listener, listener_addr) = tcp_listener().await.unwrap(); @@ -293,19 +561,19 @@ async fn _ping_pong(clients: Clients, server: Server) { let clients_done = ping_pong_clients(clients, listener_addr); - // Spawn one server + // Spawn one listener, one server task will be spawned per connection accepted. - spawn_ping_pong_listener_loop(server, listener); + tokio_uring::spawn(async move { + ping_pong_listener_loop(server, listener).await; + }); // Wait until the clients signal they are done - - // println!("common/mode.rs:{} now wait for clients to be done", line!()); - let _ = clients_done.await.unwrap(); - // println!("common/mode.rs:{} clients report being done", line!()); + clients_done.await.unwrap() } pub struct Clients { pub rx: Rx, + pub nodelay: bool, pub client_cnt: usize, pub send_cnt: usize, pub send_length: usize, @@ -313,6 +581,7 @@ pub struct Clients { pub struct Server { pub rx: Rx, + pub nodelay: bool, } pub struct PingPong { @@ -321,8 +590,8 @@ pub struct PingPong { } impl PingPong { - pub async fn run(self) { + pub async fn run(self) -> io::Result<()> { let PingPong { clients, server } = self; - _ping_pong(clients, server).await; + _ping_pong(clients, server).await } } diff --git a/tests/net.rs b/tests/net.rs index 32d1d505..2b2600ed 100644 --- a/tests/net.rs +++ b/tests/net.rs @@ -1,5 +1,6 @@ use std::sync::mpsc::sync_channel; use std::thread; +use tokio::io; use tokio_uring::buf::bufring; @@ -9,67 +10,93 @@ use common::probe; use common::Rx; #[test] -fn net_tcp_ping_pong_read_one() { +fn net_tcp_ping_pong_read_one() -> io::Result<()> { // Run one client. Both client and server use the TcpStream `read` method. tokio_uring::start(async { common::PingPong { clients: common::Clients { - rx: Rx::Read, + rx: Rx::Read(common::ReadProps { + buf_size: 16 * 1024, + }), + nodelay: false, client_cnt: 1, send_cnt: 10, send_length: 1024, }, - server: common::Server { rx: Rx::Read }, + server: common::Server { + rx: Rx::Read(common::ReadProps { + buf_size: 16 * 1024, + }), + nodelay: false, + }, } .run() - .await; - }); + .await + }) } #[test] -fn net_tcp_ping_pong_read_several() { +fn net_tcp_ping_pong_read_several() -> io::Result<()> { // Run 3 clients. Both clients and server use the TcpStream `read` method. tokio_uring::start(async { common::PingPong { clients: common::Clients { - rx: Rx::Read, + rx: Rx::Read(common::ReadProps { + buf_size: 16 * 1024, + }), + nodelay: false, client_cnt: 3, send_cnt: 10, send_length: 1024, }, - server: common::Server { rx: Rx::Read }, + server: common::Server { + rx: Rx::Read(common::ReadProps { + buf_size: 16 * 1024, + }), + nodelay: false, + }, } .run() - .await; - }); + .await + }) } #[test] -fn net_tcp_ping_pong_recv() { +fn net_tcp_ping_pong_recv() -> io::Result<()> { // Run 3 clients. Both clients and server use the TcpStream `recv` method. tokio_uring::start(async { common::PingPong { clients: common::Clients { - rx: Rx::Recv(None), + rx: Rx::Recv(common::RecvProps { + buf_size: 16 * 1024, + flags: None, + }), + nodelay: false, client_cnt: 3, send_cnt: 10, send_length: 1024, }, - server: common::Server { rx: Rx::Recv(None) }, + server: common::Server { + rx: Rx::Recv(common::RecvProps { + buf_size: 16 * 1024, + flags: None, + }), + nodelay: false, + }, } .run() - .await; - }); + .await + }) } #[test] -fn net_tcp_ping_pong_recv_oneshot_bufring() { +fn net_tcp_ping_pong_recv_oneshot_bufring_1_thread() -> io::Result<()> { if !probe::is_buf_ring_supported() { eprintln!("skipping test, buf_ring is not supported in this kernel"); - return; + return Ok(()); } // Run 5 clients. Both clients and server use the TcpStream `recv` method with a BufRing pool // that is built small enough (4 entries) that there will be some pool exhaustion that has to @@ -91,30 +118,107 @@ fn net_tcp_ping_pong_recv_oneshot_bufring() { common::PingPong { clients: common::Clients { - rx: Rx::RecvBufRing(buf_ring.clone(), None), - client_cnt: 3, + rx: Rx::RecvBufRing(common::BufRingProps { + buf_ring: buf_ring.clone(), + flags: None, + quiet_overflow: true, // normal for this test to cause pool exhaustion + }), + nodelay: false, + client_cnt: 10, send_cnt: 10, send_length: 1024, }, server: common::Server { - rx: Rx::RecvBufRing(buf_ring.clone(), None), + rx: Rx::RecvBufRing(common::BufRingProps { + buf_ring: buf_ring.clone(), + flags: None, + quiet_overflow: true, // normal for this test to cause pool exhaustion + }), + nodelay: false, }, } .run() - .await; + .await?; // Manually unregistering the buf_ring. When it goes out of scope, it is unregistered // automatically. Note, it remains in scope while there are outstanding buffers the // application hasn't dropped yet. buf_ring.unregister().unwrap(); - }); + Ok(()) + }) } #[test] -fn net_tcp_ping_pong_recv_oneshot_bufring_2_threads() { +fn net_tcp_ping_pong_recv_multishot_bufring_1_thread() -> io::Result<()> { if !probe::is_buf_ring_supported() { eprintln!("skipping test, buf_ring is not supported in this kernel"); - return; + return Ok(()); + } + // Run 5 clients. Both clients and server use the TcpStream `recv` method with a BufRing pool + // that is built small enough (4 entries) that there will be some pool exhaustion that has to + // be handled by retrying the requests. + // And a bit oddly, both clients and server are using the same BufRing, as they are all run in + // the same tokio_uring instance. + + tokio_uring::start(async { + let client_buf_ring = bufring::Builder::new(177) + .ring_entries(4) + .buf_len(4096) + // Normally, no reason to skip the auto register, but this let's us test the manual + // register below. + .skip_auto_register(true) + .build() + .unwrap(); + let server_buf_ring = bufring::Builder::new(178) + .ring_entries(2) + .buf_len(4096) + // Normally, no reason to skip the auto register, but this let's us test the manual + // register below. + .skip_auto_register(true) + .build() + .unwrap(); + + client_buf_ring.register().unwrap(); + server_buf_ring.register().unwrap(); + + common::PingPong { + clients: common::Clients { + rx: Rx::RecvMulti(common::BufRingProps { + buf_ring: client_buf_ring.clone(), + flags: None, + quiet_overflow: false, + }), + nodelay: false, + client_cnt: 4, + send_cnt: 10, + send_length: 1024, + }, + server: common::Server { + rx: Rx::RecvMulti(common::BufRingProps { + buf_ring: server_buf_ring.clone(), + flags: None, + quiet_overflow: true, // normal for this test to have to retry given the small pool size + }), + nodelay: false, + }, + } + .run() + .await?; + + // Manually unregistering the buf_rings. When any goes out of scope, it is unregistered + // automatically. Note, it remains in scope while there are outstanding buffers the + // application hasn't dropped yet. + client_buf_ring.unregister().unwrap(); + server_buf_ring.unregister().unwrap(); + Ok(()) + }) +} + +#[test] +fn net_tcp_ping_pong_recv_oneshot_bufring_2_threads() -> io::Result<()> { + if !probe::is_buf_ring_supported() { + eprintln!("skipping test, buf_ring is not supported in this kernel"); + return Ok(()); } // Similar to test net_tcp_ping_pong_recv_bufring above, but uses two new threads, // one for the server code, one for all the clients. @@ -175,12 +279,17 @@ fn net_tcp_ping_pong_recv_oneshot_bufring_2_threads() { .build() .unwrap(); let server = common::Server { - rx: Rx::RecvBufRing(buf_ring.clone(), None), + rx: Rx::RecvBufRing(common::BufRingProps { + buf_ring: buf_ring.clone(), + flags: None, + quiet_overflow: false, + }), + nodelay: false, }; let (listener, local_addr) = common::tcp_listener().await.unwrap(); addr_tx.send(local_addr).unwrap(); - common::async_block_ping_pong_listener_loop(server, listener).await; + common::ping_pong_listener_loop(server, listener).await; }); }); @@ -195,7 +304,12 @@ fn net_tcp_ping_pong_recv_oneshot_bufring_2_threads() { .build() .unwrap(); let clients = common::Clients { - rx: Rx::RecvBufRing(buf_ring.clone(), None), + rx: Rx::RecvBufRing(common::BufRingProps { + buf_ring: buf_ring.clone(), + flags: None, + quiet_overflow: false, + }), + nodelay: false, client_cnt: CLIENT_CNT, send_cnt: SENDS_PER_CLIENT, send_length: SEND_LENGTH, @@ -204,12 +318,119 @@ fn net_tcp_ping_pong_recv_oneshot_bufring_2_threads() { // Wait for the clients tasks to be done. - // println!("net.rs:{} now wait for clients to be done", line!()); - let _ = clients_done.await.unwrap(); - // println!("net.rs:{} clients report being done", line!()); + clients_done.await.unwrap() + }) + }); + + // Wait for the clients thread to finish. + clients_handle.join().unwrap() +} + +#[test] +fn net_tcp_ping_pong_recv_multishot_bufring_2_threads() -> io::Result<()> { + if !probe::is_buf_ring_supported() { + eprintln!("skipping test, buf_ring is not supported in this kernel"); + return Ok(()); + } + + use libc::{sysconf, _SC_PAGESIZE}; + let page_size: usize = unsafe { sysconf(_SC_PAGESIZE) as usize }; + + /* + * Even with SERVER_NODELAY = true, + * this yields a test run that takes about 36s/debug, 20s/release. + * This one is very interesting because it hits the case where the multishot recv was completed + * while there was still more data to read. So the client logic had to be rewritten to allow + * the multishot recv commands to be issued more than once while remembering how far the client + * had gotten in sending packets and in receiving the parts of the packet. + const CLIENT_CNT: usize = 32; + const SENDS_PER_CLIENT: usize = 64 * 64; + const SEND_LENGTH: usize = 64 * 1024; + const CLIENT_BUFRING_SIZE: u16 = 16 * 64; + const SERVER_BUFRING_SIZE: u16 = 16 * 64; + const CLIENT_NODELAY: bool = true; + const SERVER_NODELAY: bool = true; + */ + /* + * These yields a test run that takes about 3s, + * unless the SERVER_NODELAY is set to true, and then only .7s. + const CLIENT_CNT: usize = 32; + const SENDS_PER_CLIENT: usize = 64; + const SEND_LENGTH: usize = 64 * 1024; + const CLIENT_BUFRING_SIZE: u16 = 16*64; + const SERVER_BUFRING_SIZE: u16 = 16*64; + const CLIENT_NODELAY: bool = false; + const SERVER_NODELAY: bool = true; + */ + /* + * The normal small test case. + */ + const CLIENT_CNT: usize = 4; + const SENDS_PER_CLIENT: usize = 4; + const SEND_LENGTH: usize = 4 * 1024; + const CLIENT_BUFRING_SIZE: u16 = 8; + const SERVER_BUFRING_SIZE: u16 = 8; + const CLIENT_NODELAY: bool = false; + const SERVER_NODELAY: bool = false; + + const CLIENT_BUF_LEN: usize = 4096; + const SERVER_BUF_LEN: usize = 4096; + + // Used by the thread running the server to pass its ephemeral local port to the thread + let (addr_tx, addr_rx) = sync_channel::(0); + + let _server_handle = thread::spawn(move || { + tokio_uring::start(async { + let buf_ring = bufring::Builder::new(261) + .page_size(page_size) + .ring_entries(SERVER_BUFRING_SIZE) + .buf_len(SERVER_BUF_LEN) + .build() + .unwrap(); + let server = common::Server { + rx: Rx::RecvMulti(common::BufRingProps { + buf_ring: buf_ring.clone(), + flags: None, + quiet_overflow: false, + }), + nodelay: SERVER_NODELAY, + }; + let (listener, local_addr) = common::tcp_listener().await.unwrap(); + addr_tx.send(local_addr).unwrap(); + + common::ping_pong_listener_loop(server, listener).await; }); }); + let listener_addr = addr_rx.recv().unwrap(); + + let clients_handle = thread::spawn(move || { + tokio_uring::start(async { + let buf_ring = bufring::Builder::new(262) + .page_size(page_size) + .ring_entries(CLIENT_BUFRING_SIZE as u16) + .buf_len(CLIENT_BUF_LEN) + .build() + .unwrap(); + let clients = common::Clients { + rx: Rx::RecvMulti(common::BufRingProps { + buf_ring: buf_ring.clone(), + flags: None, + quiet_overflow: false, + }), + nodelay: CLIENT_NODELAY, + client_cnt: CLIENT_CNT, + send_cnt: SENDS_PER_CLIENT, + send_length: SEND_LENGTH, + }; + let clients_done = common::ping_pong_clients(clients, listener_addr); + + // Wait for the clients tasks to be done. + + clients_done.await.unwrap() + }) + }); + // Wait for the clients thread to finish. - clients_handle.join().unwrap(); + clients_handle.join().unwrap() } From 5139ae36e2a57f08e9bcaa8f13e0a78bc987ec47 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Mon, 27 Feb 2023 22:57:21 -0500 Subject: [PATCH 15/16] op: add accept_multi for TcpListener And adds a unit test to tests/net. --- src/io/accept_multi.rs | 64 +++++++++++++++++++++++++++++++++++++++++ src/io/mod.rs | 2 ++ src/io/socket.rs | 5 ++++ src/net/tcp/listener.rs | 18 ++++++++++++ tests/common/mod.rs | 4 +++ tests/net.rs | 50 +++++++++++++++++++++++++++++++- 6 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 src/io/accept_multi.rs diff --git a/src/io/accept_multi.rs b/src/io/accept_multi.rs new file mode 100644 index 00000000..a7fc7030 --- /dev/null +++ b/src/io/accept_multi.rs @@ -0,0 +1,64 @@ +use crate::io::{SharedFd, Socket}; + +use crate::runtime::driver::op::{CqeResult, MultiCQEStream, Op, Streamable}; +use crate::runtime::CONTEXT; +use std::io; + +pub(crate) type AcceptMultiStream = Op; + +pub(crate) struct AcceptMulti { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + _fd: SharedFd, +} + +impl AcceptMultiStream { + /// Valid flags are: + /// + /// SOCK_NONBLOCK Set the O_NONBLOCK file status flag on the open file description + /// (see open(2)) referred to by the new file descriptor. Using this + /// flag saves extra calls to fcntl(2) to achieve the same re sult. + /// + /// SOCK_CLOEXEC Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. + /// See the description of the O_CLOEXEC flag in open(2) for + /// reasons why this may be useful. + pub(crate) fn accept_multi(fd: &SharedFd, flags: Option) -> io::Result { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle() + .expect("Not in a runtime context") + .submit_op_stream(AcceptMulti { _fd: fd.clone() }, |s| { + opcode::AcceptMulti::new(types::Fd(s._fd.raw_fd())) + .flags(flags.unwrap_or(0)) + .build() + }) + }) + } +} + +impl Streamable for AcceptMulti { + type Item = io::Result; + + fn stream_next(&self, cqe: CqeResult) -> Self::Item { + // The kernel is not expected to provide an error when the `more` bit was set, + // but it's just as easy to handle the case here. + let fd = cqe.result?; + let fd = SharedFd::new(fd as i32); + let socket = Socket { fd }; + Ok(socket) + } + + fn stream_complete(self, cqe: CqeResult) -> Option { + // There is no None option returned by this stream completion method. + // The poll_next_op will return that once the stream is polled an additional time. + + let fd = match cqe.result { + Ok(result) => result, + Err(e) => return Some(Err(e)), + }; + let fd = SharedFd::new(fd as i32); + let socket = Socket { fd }; + Some(Ok(socket)) + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index a373e4be..35146c95 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,5 +1,7 @@ mod accept; +mod accept_multi; + mod close; mod connect; diff --git a/src/io/socket.rs b/src/io/socket.rs index 383a61ab..1a950ca6 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,3 +1,4 @@ +use crate::io::accept_multi::AcceptMultiStream; use crate::io::recv::UnsubmittedRecv; use crate::io::recv_multi::RecvMultiStream; use crate::io::recv_provbuf::UnsubmittedRecvProvBuf; @@ -215,6 +216,10 @@ impl Socket { op.await } + pub(crate) fn accept_multi(&self, flags: Option) -> AcceptMultiStream { + Op::accept_multi(&self.fd, flags).unwrap() + } + pub(crate) async fn connect(&self, socket_addr: socket2::SockAddr) -> io::Result<()> { let op = Op::connect(&self.fd, socket_addr)?; op.await diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 98ca8fdd..8569a0d5 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,5 +1,6 @@ use super::TcpStream; use crate::io::Socket; +use futures_core::Stream; use std::{io, net::SocketAddr}; /// A TCP socket server, listening for connections. @@ -97,4 +98,21 @@ impl TcpListener { })?; Ok((stream, socket_addr)) } + + /// Accepts multiple incoming connections from this listener. + pub fn accept_multi( + &self, + flags: Option, + ) -> impl Stream> + '_ { + use async_stream::stream; + use tokio::pin; + use tokio_stream::StreamExt; + stream! { + let s = self.inner.accept_multi(flags); + pin!(s); + while let Some(socket) = s.next().await { + yield socket.map(|s| TcpStream { inner: s }) + } + } + } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 413fc5d5..41069659 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -58,6 +58,10 @@ pub fn is_connection_reset_by_peer(e: &std::io::Error) -> bool { pub fn is_broken_pipe(e: &std::io::Error) -> bool { e.raw_os_error() == Some(32) } +#[inline] +pub fn is_too_many_open_files(e: &std::io::Error) -> bool { + e.raw_os_error() == Some(24) +} async fn _client_ping_pong_recv_multi( stream: TcpStream, diff --git a/tests/net.rs b/tests/net.rs index 2b2600ed..2cdea25f 100644 --- a/tests/net.rs +++ b/tests/net.rs @@ -1,8 +1,9 @@ use std::sync::mpsc::sync_channel; use std::thread; use tokio::io; - +use tokio_stream::StreamExt; use tokio_uring::buf::bufring; +use tokio_uring::net::TcpStream; mod common; @@ -434,3 +435,50 @@ fn net_tcp_ping_pong_recv_multishot_bufring_2_threads() -> io::Result<()> { // Wait for the clients thread to finish. clients_handle.join().unwrap() } + +#[test] +fn net_accept_multi_1_thread() -> io::Result<()> { + const CONNECTIONS: usize = 4; + + println!("Test to get {CONNECTIONS} TCP connections accepted"); + + tokio_uring::start(async { + let (listener, listener_addr) = common::tcp_listener().await.unwrap(); + + let mut clients: Vec<_> = vec![]; + + for _ in 0..CONNECTIONS { + let stream = TcpStream::connect(listener_addr).await; + let stream = match stream { + Ok(stream) => stream, + Err(e) => { + if common::is_too_many_open_files(&e) { + println!("expected: {}", e); + } else { + println!("unexpected: {}", e); + } + break; + } + }; + clients.push(stream); + } + + let streams = listener.accept_multi(None); + tokio::pin!(streams); + + let mut connected_count: usize = 0; + while let Some(_stream) = streams.next().await { + connected_count += 1; + println!("another stream connected, number {connected_count}"); + + if connected_count == clients.len() { + println!("breaking out of loop"); + break; + } + } + println!("loop done"); + }); + println!("tokio_ring::start done"); + + Ok(()) +} From a89d4049445d58564a1d414aba99dbc1f3791e36 Mon Sep 17 00:00:00 2001 From: Frank Rehwinkel Date: Tue, 28 Feb 2023 08:35:05 -0500 Subject: [PATCH 16/16] op: poll and poll_multi --- src/io/mod.rs | 4 +++ src/io/poll.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++ src/io/poll_multi.rs | 49 ++++++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+) create mode 100644 src/io/poll.rs create mode 100644 src/io/poll_multi.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index 35146c95..f3dd967e 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -17,6 +17,10 @@ pub(crate) use noop::NoOp; mod open; +mod poll; + +mod poll_multi; + mod read; mod read_fixed; diff --git a/src/io/poll.rs b/src/io/poll.rs new file mode 100644 index 00000000..128c3982 --- /dev/null +++ b/src/io/poll.rs @@ -0,0 +1,62 @@ +use crate::{io::SharedFd, OneshotOutputTransform, UnsubmittedOneshot}; +use io_uring::cqueue::Entry; +use std::io; + +/// An unsubmitted poll operation. +/// +/// Used to call the IORING_OP_POLL_ADD operation. +/// +/// The libc POLL flag bits are defined +/// +/// libc::{ +/// POLLIN, +/// POLLPRI, +/// POLLOUT, +/// POLLERR, +/// POLLHUP, +/// POLLNVAL, +/// POLLRDNORM, +/// POLLRDBAND, +/// } +/// +/// #[cfg(not(any(target_arch = "sparc", target_arch = "sparc64")))] +/// libc::POLLRDHUP +/// #[cfg(any(target_arch = "sparc", target_arch = "sparc64"))] +/// libc::POLLRDHUP +pub type UnsubmittedPoll = UnsubmittedOneshot; + +#[allow(missing_docs)] +pub struct PollData { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + _fd: SharedFd, +} + +#[allow(missing_docs)] +pub struct PollTransform {} + +impl OneshotOutputTransform for PollTransform { + type Output = io::Result; + type StoredData = PollData; + + fn transform_oneshot_output(self, _data: Self::StoredData, cqe: Entry) -> Self::Output { + if cqe.result() >= 0 { + Ok(cqe.result() as u32) + } else { + Err(io::Error::from_raw_os_error(-cqe.result())) + } + } +} + +impl UnsubmittedPoll { + #[allow(dead_code)] + pub(crate) fn poll(fd: &SharedFd, flags: u32) -> Self { + use io_uring::{opcode, types}; + + Self::new( + PollData { _fd: fd.clone() }, + PollTransform {}, + opcode::PollAdd::new(types::Fd(fd.raw_fd()), flags as _).build(), + ) + } +} diff --git a/src/io/poll_multi.rs b/src/io/poll_multi.rs new file mode 100644 index 00000000..bfdff957 --- /dev/null +++ b/src/io/poll_multi.rs @@ -0,0 +1,49 @@ +use crate::io::SharedFd; + +use crate::runtime::driver::op::{CqeResult, MultiCQEStream, Op, Streamable}; +use crate::runtime::CONTEXT; +use std::io; + +#[allow(dead_code)] +pub(crate) type PollMultiStream = Op; + +pub(crate) struct PollMulti { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + _fd: SharedFd, +} + +impl PollMultiStream { + #[allow(dead_code)] + pub(crate) fn poll_multi(fd: &SharedFd, flags: u32) -> io::Result { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle() + .expect("Not in a runtime context") + .submit_op_stream(PollMulti { _fd: fd.clone() }, |_| { + opcode::PollAdd::new(types::Fd(fd.raw_fd()), flags as _) + .multi(true) + .build() + }) + }) + } +} + +impl Streamable for PollMulti { + type Item = io::Result; + + fn stream_next(&self, cqe: CqeResult) -> Self::Item { + // The kernel is not expected to provide an error when the `more` bit was set, + // but it's just as easy to handle the case here. + let res = cqe.result?; + Ok(res) + } + + fn stream_complete(self, cqe: CqeResult) -> Option { + match cqe.result { + Ok(res) => Some(Ok(res)), + Err(e) => Some(Err(e)), + } + } +}