From c7b0f7a446a8622c76c5a9794a07db24b8da426d Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Mon, 3 Jun 2024 18:10:42 +0100 Subject: [PATCH 1/5] First attempt at thread creation/management. --- xde/src/lib.rs | 1 + xde/src/thread.rs | 74 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 xde/src/thread.rs diff --git a/xde/src/lib.rs b/xde/src/lib.rs index 726f1ef4..a7e87431 100644 --- a/xde/src/lib.rs +++ b/xde/src/lib.rs @@ -47,6 +47,7 @@ mod mac_sys; pub mod route; pub mod secpolicy; pub mod sys; +pub mod thread; pub mod xde; // On alignment, `kmem_alloc(9F)` has this of offer: diff --git a/xde/src/thread.rs b/xde/src/thread.rs new file mode 100644 index 00000000..8f03c321 --- /dev/null +++ b/xde/src/thread.rs @@ -0,0 +1,74 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2024 Oxide Computer Company + +//! Illumos kthread support. + +use crate::ip::kthread_t; +use crate::ip::p0; +use crate::ip::thread_create; +use crate::ip::thread_exit; +use crate::ip::thread_join; +use crate::ip::TS_RUN; +use alloc::boxed::Box; +use core::ffi::c_void; +use core::ptr; +use core::ptr::addr_of_mut; +use core::ptr::NonNull; + +unsafe extern "C" fn kthread_body(arg: *mut c_void) { + let arg = arg as *mut Box; + let closure = unsafe { Box::from_raw(arg) }; + + closure(); + // closure used by val, so dropped before thread exit. + + unsafe { + thread_exit(); + } +} + +pub fn spawn(f: F) -> JoinHandle +where + F: FnOnce(), // -> T, + F: Send + 'static, + // T: Send + 'static, +{ + // A bit of an odd dance here -- we need to double box to get a thin + // pointer at the `into_raw` side. + let boxed = Box::new(f) as Box; + let arg = Box::into_raw(Box::new(boxed)); + let handle = unsafe { + thread_create( + ptr::null_mut(), + 0, // pulled up to default stack size. + // Typedef implies no args, reality implies args. Huh. + Some(core::mem::transmute::<_, unsafe extern "C" fn()>( + kthread_body as unsafe extern "C" fn(_), + )), + arg as *mut c_void, + 0, + addr_of_mut!(p0), + TS_RUN as i32, + 60, //minclsyspri + ) + }; + + let handle = NonNull::new(handle) + .expect("thread_create returned a null ptr, \ + but is documented as infallible"); + + JoinHandle { handle } +} + +pub struct JoinHandle { + handle: NonNull, +} + +impl JoinHandle { + pub fn join(self) { + unsafe { thread_join((*self.handle.as_ptr()).t_did) } + } +} From bd03942127ebf121747a4a811e321be297b28499 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Mon, 10 Jun 2024 11:30:10 +0100 Subject: [PATCH 2/5] Mailbox types. --- Cargo.lock | 13 ++- Cargo.toml | 2 +- crates/illumos-sys-hdrs/src/kernel.rs | 61 ++++++++++- crates/illumos-sys-hdrs/src/lib.rs | 3 +- lib/opte/src/ddi/sync.rs | 78 +++++++++++++- lib/opte/src/engine/packet.rs | 150 ++++++++++++++++++++++++-- xde/src/thread.rs | 7 +- 7 files changed, 298 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 721b63e4..6481dc05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -921,6 +921,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -1213,7 +1222,7 @@ dependencies = [ "dyn-clone", "heapless", "illumos-sys-hdrs", - "itertools 0.12.1", + "itertools 0.13.0", "kstat-macro", "opte", "opte-api", @@ -1245,7 +1254,7 @@ dependencies = [ "clap", "criterion", "ctor", - "itertools 0.12.1", + "itertools 0.13.0", "nix", "opte", "opte-test-utils", diff --git a/Cargo.toml b/Cargo.toml index 7c2230d2..01ad78ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ darling = "0.20" dyn-clone = "1.0" heapless = "0.8" ipnetwork = { version = "0.20", default-features = false } -itertools = { version = "0.12", default-features = false } +itertools = { version = "0.13", default-features = false } libc = "0.2" libnet = { git = "https://github.com/oxidecomputer/netadm-sys" } nix = { version = "0.28", features = ["signal", "user"] } diff --git a/crates/illumos-sys-hdrs/src/kernel.rs b/crates/illumos-sys-hdrs/src/kernel.rs index ec510bc9..f19f0b2c 100644 --- a/crates/illumos-sys-hdrs/src/kernel.rs +++ b/crates/illumos-sys-hdrs/src/kernel.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company +// Copyright 2024 Oxide Computer Company #![allow(clippy::missing_safety_doc)] @@ -80,6 +80,31 @@ impl krw_t { pub const RW_READER_STARVEWRITER: Self = Self(2); } +#[repr(C)] +pub struct kcondvar_t { + pub _opaque: c_ushort, +} + +#[repr(transparent)] +#[derive(Copy, Clone, Eq, PartialEq)] +pub struct kcv_type_t(pub c_int); +impl kcv_type_t { + pub const CV_DEFAULT: Self = Self(0); + pub const CV_DRIVER: Self = Self(1); +} + +#[repr(transparent)] +#[derive(Copy, Clone, Eq, PartialEq)] +pub struct time_res_t(pub c_int); +impl time_res_t { + pub const TR_NANOSEC: Self = Self(0); + pub const TR_MICROSEC: Self = Self(1); + pub const TR_MILLISEC: Self = Self(2); + pub const TR_SEC: Self = Self(3); + pub const TR_CLOCK_TICK: Self = Self(4); + pub const TR_COUNT: Self = Self(5); +} + extern "C" { type module_info; type module_stat; @@ -586,6 +611,40 @@ extern "C" { pub fn rw_tryupgrade(rwlp: *mut krwlock_t); pub fn rw_read_locked(rwlp: *mut krwlock_t); + pub fn cv_init( + cvp: *mut kcondvar_t, + name: *const c_char, + cv_type: kcv_type_t, + arg: *mut c_void, + ); + pub fn cv_destroy(cvp: *mut kcondvar_t); + pub fn cv_wait(cvp: *mut kcondvar_t, mp: *mut kmutex_t); + pub fn cv_signal(cvp: *mut kcondvar_t); + pub fn cv_broadcast(cvp: *mut kcondvar_t); + pub fn cv_wait_sig(cvp: *mut kcondvar_t, mp: *mut kmutex_t) -> c_int; + pub fn cv_timedwait( + cvp: *mut kcondvar_t, + mp: *mut kmutex_t, + timeout: clock_t, + ) -> clock_t; + pub fn cv_timedwait_sig( + cvp: *mut kcondvar_t, + mp: *mut kmutex_t, + timeout: clock_t, + ) -> clock_t; + pub fn cv_reltimedwait( + cvp: *mut kcondvar_t, + mp: *mut kmutex_t, + delta: clock_t, + res: time_res_t, + ) -> clock_t; + pub fn cv_reltimedwait_sig( + cvp: *mut kcondvar_t, + mp: *mut kmutex_t, + delta: clock_t, + res: time_res_t, + ) -> clock_t; + pub fn nochpoll() -> c_int; pub fn nodev() -> c_int; pub fn nulldev() -> c_int; diff --git a/crates/illumos-sys-hdrs/src/lib.rs b/crates/illumos-sys-hdrs/src/lib.rs index 1f52a7f7..96ebd66d 100644 --- a/crates/illumos-sys-hdrs/src/lib.rs +++ b/crates/illumos-sys-hdrs/src/lib.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company +// Copyright 2024 Oxide Computer Company #![cfg_attr(feature = "kernel", feature(extern_types))] #![allow(non_camel_case_types)] #![no_std] @@ -307,6 +307,7 @@ pub type hrtime_t = c_longlong; // ====================================================================== // uts/common/sys/types.h // ====================================================================== +pub type clock_t = c_long; pub type datalink_id_t = uint32_t; pub type dev_t = c_ulong; pub type id_t = c_int; diff --git a/lib/opte/src/ddi/sync.rs b/lib/opte/src/ddi/sync.rs index 6050a738..63a866bf 100644 --- a/lib/opte/src/ddi/sync.rs +++ b/lib/opte/src/ddi/sync.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company +// Copyright 2024 Oxide Computer Company //! Safe abstractions for synchronization primitives. //! @@ -16,12 +16,15 @@ cfg_if! { use core::cell::UnsafeCell; use core::ptr; use illumos_sys_hdrs::{ + cv_broadcast, cv_destroy, cv_init, cv_signal, cv_wait, + kcondvar_t, kcv_type_t, kmutex_t, krw_t, krwlock_t, mutex_enter, mutex_exit, mutex_destroy, mutex_init, rw_enter, rw_exit, rw_init, rw_destroy }; } else { use std::sync::Mutex; + use std::sync::Condvar; } } @@ -400,3 +403,76 @@ impl KRwLock { KRwLockWriteGuard { guard } } } + +#[cfg(all(not(feature = "std"), not(test)))] +pub struct KCondvar { + cv: UnsafeCell, +} + +#[cfg(any(feature = "std", test))] +pub struct KCondvar { + cv: Condvar, +} + +#[cfg(all(not(feature = "std"), not(test)))] +impl KCondvar { + pub fn new() -> Self { + let mut cv = kcondvar_t { _opaque: 0 }; + + unsafe { + cv_init( + &mut cv, + ptr::null_mut(), + kcv_type_t::CV_DRIVER, + ptr::null_mut(), + ); + } + + Self { cv: UnsafeCell::new(cv) } + } + + pub fn notify_one(&self) { + unsafe { cv_signal(self.cv.get()) } + } + + pub fn notify_all(&self) { + unsafe { cv_broadcast(self.cv.get()) } + } + + pub fn wait<'a, T: 'a>( + &self, + lock: KMutexGuard<'a, T>, + ) -> KMutexGuard<'a, T> { + unsafe { cv_wait(self.cv.get(), lock.lock.mutex.0.get()) } + lock + } +} + +#[cfg(any(feature = "std", test))] +impl KCondvar { + pub fn new() -> Self { + Self { cv: Condvar::new() } + } + + pub fn notify_one(&self) { + self.cv.notify_one() + } + + pub fn notify_all(&self) { + self.cv.notify_one() + } + + pub fn wait<'a, T: 'a>( + &self, + lock: KMutexGuard<'a, T>, + ) -> KMutexGuard<'a, T> { + KMutexGuard { guard: self.cv.wait(lock.guard).unwrap() } + } +} + +#[cfg(all(not(feature = "std"), not(test)))] +impl Drop for KCondvar { + fn drop(&mut self) { + unsafe { cv_destroy(self.cv.get()) }; + } +} diff --git a/lib/opte/src/engine/packet.rs b/lib/opte/src/engine/packet.rs index d9ac2107..f4980885 100644 --- a/lib/opte/src/engine/packet.rs +++ b/lib/opte/src/engine/packet.rs @@ -8,11 +8,6 @@ //! //! TODO //! -//! * Add a PacketChain type to represent a chain of one or more -//! indepenndent packets. Also consider having chains that represent -//! multiple packets for the same flow if it would be advantageous to -//! do so. -//! //! * Add hardware offload information to [`Packet`]. //! @@ -49,12 +44,16 @@ use super::ip6::Ipv6HdrError; use super::ip6::Ipv6Meta; use super::NetworkParser; use crate::d_error::DError; +use crate::ddi::sync::KCondvar; +use crate::ddi::sync::KMutex; +use alloc::collections::LinkedList; use core::fmt; use core::fmt::Display; use core::ptr; use core::ptr::NonNull; use core::result; use core::slice; +use core::sync::atomic::AtomicBool; use crc32fast::Hasher; use dyn_clone::DynClone; use serde::Deserialize; @@ -423,6 +422,7 @@ impl PacketMeta { struct PacketChainInner { head: NonNull, tail: NonNull, + len: usize, } /// A chain of network packets. @@ -460,11 +460,21 @@ impl PacketChain { // Walk the chain to find the tail, and support faster append. let mut tail = head; + let mut len = 0; while let Some(next_ptr) = NonNull::new((*tail.as_ptr()).b_next) { + len += 1; tail = next_ptr; } - Ok(Self { inner: Some(PacketChainInner { head, tail }) }) + Ok(Self { inner: Some(PacketChainInner { head, tail, len }) }) + } + + pub fn len(&self) -> usize { + self.inner.as_ref().map(|v| v.len).unwrap_or_default() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 } /// Removes the next packet from the top of the chain and returns @@ -527,8 +537,10 @@ impl PacketChain { // pkt_p->b_next is already null. } list.tail = pkt; + list.len += 1; } else { - self.inner = Some(PacketChainInner { head: pkt, tail: pkt }); + self.inner = + Some(PacketChainInner { head: pkt, tail: pkt, len: 1 }); } } @@ -538,6 +550,31 @@ impl PacketChain { pub fn unwrap_mblk(mut self) -> Option> { self.inner.take().map(|v| v.head) } + + /// Append another `PacketChain` to the end of this one. + pub fn extend(&mut self, mut other: Self) { + match (&mut self.inner, other.inner.take()) { + // Append them to us. + (Some(my_inner), Some(their_inner)) => { + // link our tail with their head. + let old_tail_p = my_inner.tail.as_ptr(); + let their_head_p = their_inner.head.as_ptr(); + unsafe { + (*old_tail_p).b_next = their_head_p; + (*their_head_p).b_prev = old_tail_p; + } + + my_inner.tail = their_inner.tail; + my_inner.len += their_inner.len; + } + // replace with their inner. + (None, their_inner @ Some(_)) => { + self.inner = their_inner; + } + // Append an empty list: no-op. + (_, None) => {} + } + } } impl Drop for PacketChain { @@ -562,6 +599,105 @@ impl Drop for PacketChain { } } +/// A `PacketChain` plus per-element metadata. +// using linked list, probably want to swap vecdeques in and out. +pub struct PacketChainAnd { + packets: PacketChain, + metadata: LinkedList, +} + +impl Default for PacketChainAnd { + fn default() -> Self { + Self { packets: PacketChain::empty(), metadata: LinkedList::new() } + } +} + +impl Iterator for PacketChainAnd { + type Item = (Packet, T); + + fn next(&mut self) -> Option { + self.packets + .pop_front() + .and_then(|el| self.metadata.pop_front().map(|meta| (el, meta))) + } +} + +/// Not quite an SQueue. +// TODO: work in signalling/condvar +pub struct EssQueue { + inner: KMutex>, + cv: KCondvar, + watermark: usize, + kill: AtomicBool, +} + +impl EssQueue { + pub fn new(watermark: usize) -> Self { + Self { + inner: KMutex::new( + Default::default(), + crate::ddi::sync::KMutexType::Driver, + ), + cv: KCondvar::new(), + watermark, + kill: false.into(), + } + } + + // TODO: want in future to maybe have T be derived from the packet (e.g., parsed). + pub fn deliver( + &self, + packets: PacketChain, + mut f: impl FnMut() -> T, + ) -> Result<(), EssQueueDeliverError> { + // pre-prepare list of elements to push at back. + let mut els: LinkedList = (0..packets.len()).map(|_| f()).collect(); + + let mut workspace = self.inner.lock(); + + if workspace.packets.len() > self.watermark { + return Err(EssQueueDeliverError::Full); + } + + workspace.packets.extend(packets); + workspace.metadata.append(&mut els); + + // We can notify with/without the lock, but illumos prefers + // we do so with the lock 'for scheduling purposes'. + self.cv.notify_one(); + + Ok(()) + } + + // Probably want there to be a timeout here rather than just a kill flag. + pub fn receive(&self) -> Result, EssQueueReceiveError> { + let mut workspace = self.inner.lock(); + + while workspace.packets.is_empty() { + if self.kill.load(core::sync::atomic::Ordering::Relaxed) { + return Err(EssQueueReceiveError::Dead); + } + + workspace = self.cv.wait(workspace); + } + + Ok(core::mem::take(&mut workspace)) + } + + pub fn quiesce(&self) { + self.kill.store(true, core::sync::atomic::Ordering::Relaxed); + self.cv.notify_all(); + } +} + +pub enum EssQueueDeliverError { + Full, +} + +pub enum EssQueueReceiveError { + Dead, +} + /// A network packet. /// /// The [`Packet`] type presents an abstraction for manipulating diff --git a/xde/src/thread.rs b/xde/src/thread.rs index 8f03c321..42f2e6e1 100644 --- a/xde/src/thread.rs +++ b/xde/src/thread.rs @@ -56,9 +56,10 @@ where ) }; - let handle = NonNull::new(handle) - .expect("thread_create returned a null ptr, \ - but is documented as infallible"); + let handle = NonNull::new(handle).expect( + "thread_create returned a null ptr, \ + but is documented as infallible", + ); JoinHandle { handle } } From 491dc1ee28b5d2be5a44ac4a81370f21c0ac87d1 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Wed, 12 Jun 2024 15:17:11 +0100 Subject: [PATCH 3/5] A (very slow/contended) worker thread --- dtrace/opte-count-cycles.d | 4 +- lib/opte/src/ddi/sync.rs | 32 +++++++- lib/opte/src/engine/packet.rs | 20 ++++- xde/src/xde.rs | 143 ++++++++++++++++++++++++++++++++-- 4 files changed, 184 insertions(+), 15 deletions(-) diff --git a/dtrace/opte-count-cycles.d b/dtrace/opte-count-cycles.d index b5b8a1e7..609dab21 100644 --- a/dtrace/opte-count-cycles.d +++ b/dtrace/opte-count-cycles.d @@ -6,12 +6,12 @@ xde_rx:entry { self->ts = vtimestamp; } -xde_mc_tx:return /self->ts/ { +xde_mc_tx_one:return /self->ts/ { @time["tx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0; } -xde_rx:return /self->ts/ { +xde_rx_one:return /self->ts/ { @time["rx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0; } diff --git a/lib/opte/src/ddi/sync.rs b/lib/opte/src/ddi/sync.rs index 63a866bf..7958dadf 100644 --- a/lib/opte/src/ddi/sync.rs +++ b/lib/opte/src/ddi/sync.rs @@ -18,8 +18,9 @@ cfg_if! { use illumos_sys_hdrs::{ cv_broadcast, cv_destroy, cv_init, cv_signal, cv_wait, kcondvar_t, kcv_type_t, - kmutex_t, krw_t, krwlock_t, mutex_enter, mutex_exit, - mutex_destroy, mutex_init, rw_enter, rw_exit, rw_init, + kmutex_t, krw_t, krwlock_t, mutex_destroy, mutex_enter, + mutex_exit, + mutex_init, mutex_tryenter, rw_enter, rw_exit, rw_init, rw_destroy }; } else { @@ -137,10 +138,23 @@ impl KMutex { unsafe { mutex_enter(self.mutex.0.get()) }; KMutexGuard { lock: self } } + + pub fn try_lock(&self) -> Result, LockTaken> { + let try_lock = unsafe { + mutex_tryenter(self.mutex.0.get()) + }; + if try_lock != 0 { + KMutexGuard { lock: self } + } else { + Err(LockTaken) + } + } } -unsafe impl Send for KMutex {} -unsafe impl Sync for KMutex {} +pub struct LockTaken; + +unsafe impl Send for KCondvar {} +unsafe impl Sync for KCondvar {} #[cfg(all(not(feature = "std"), not(test)))] pub struct KMutexGuard<'a, T: 'a> { @@ -215,6 +229,13 @@ impl KMutex { let guard = self.inner.lock().unwrap(); KMutexGuard { guard } } + + pub fn try_lock(&self) -> Result, LockTaken> { + self.inner.try_lock().map_err(|err| match err { + std::sync::TryLockError::Poisoned(_) => panic!("oops"), + std::sync::TryLockError::WouldBlock => LockTaken, + }) + } } /// A wrapper around illumos rwlock(9F) @@ -404,6 +425,9 @@ impl KRwLock { } } +unsafe impl Send for KMutex {} +unsafe impl Sync for KMutex {} + #[cfg(all(not(feature = "std"), not(test)))] pub struct KCondvar { cv: UnsafeCell, diff --git a/lib/opte/src/engine/packet.rs b/lib/opte/src/engine/packet.rs index f4980885..2ff25965 100644 --- a/lib/opte/src/engine/packet.rs +++ b/lib/opte/src/engine/packet.rs @@ -49,6 +49,7 @@ use crate::ddi::sync::KMutex; use alloc::collections::LinkedList; use core::fmt; use core::fmt::Display; +use core::num::NonZeroUsize; use core::ptr; use core::ptr::NonNull; use core::result; @@ -460,7 +461,7 @@ impl PacketChain { // Walk the chain to find the tail, and support faster append. let mut tail = head; - let mut len = 0; + let mut len = 1; while let Some(next_ptr) = NonNull::new((*tail.as_ptr()).b_next) { len += 1; tail = next_ptr; @@ -599,6 +600,9 @@ impl Drop for PacketChain { } } +unsafe impl Send for PacketChain {} +unsafe impl Sync for PacketChain {} + /// A `PacketChain` plus per-element metadata. // using linked list, probably want to swap vecdeques in and out. pub struct PacketChainAnd { @@ -606,6 +610,12 @@ pub struct PacketChainAnd { metadata: LinkedList, } +impl PacketChainAnd { + pub fn len(&self) -> usize { + self.packets.len() + } +} + impl Default for PacketChainAnd { fn default() -> Self { Self { packets: PacketChain::empty(), metadata: LinkedList::new() } @@ -627,12 +637,12 @@ impl Iterator for PacketChainAnd { pub struct EssQueue { inner: KMutex>, cv: KCondvar, - watermark: usize, + watermark: Option, kill: AtomicBool, } impl EssQueue { - pub fn new(watermark: usize) -> Self { + pub fn new(watermark: Option) -> Self { Self { inner: KMutex::new( Default::default(), @@ -655,7 +665,7 @@ impl EssQueue { let mut workspace = self.inner.lock(); - if workspace.packets.len() > self.watermark { + if self.watermark.map(|w| workspace.packets.len() > w.into()).unwrap_or(false) { return Err(EssQueueDeliverError::Full); } @@ -4137,6 +4147,8 @@ mod test { let mut chain = unsafe { PacketChain::new(els[0]) }.unwrap(); let pkt = unsafe { Packet::wrap_mblk(new_el) }.unwrap(); + assert_eq!(chain.len(), els.len()); + chain.append(pkt); // Chain head/tail ptrs are correct diff --git a/xde/src/xde.rs b/xde/src/xde.rs index 7d46d457..27ee60dd 100644 --- a/xde/src/xde.rs +++ b/xde/src/xde.rs @@ -28,6 +28,8 @@ use crate::route::RouteCache; use crate::route::RouteKey; use crate::secpolicy; use crate::sys; +use crate::thread::spawn; +use crate::thread::JoinHandle; use crate::warn; use alloc::boxed::Box; use alloc::ffi::CString; @@ -37,6 +39,7 @@ use alloc::sync::Arc; use alloc::vec::Vec; use core::ffi::CStr; use core::num::NonZeroU32; +use core::num::NonZeroUsize; use core::ptr; use core::ptr::addr_of; use core::ptr::addr_of_mut; @@ -64,6 +67,7 @@ use opte::engine::headers::EncapMeta; use opte::engine::headers::IpAddr; use opte::engine::ioctl::{self as api}; use opte::engine::ip6::Ipv6Addr; +use opte::engine::packet::EssQueue; use opte::engine::packet::Initialized; use opte::engine::packet::InnerFlowId; use opte::engine::packet::Packet; @@ -145,6 +149,14 @@ extern "C" { pub fn __dtrace_probe_hdlr__resp(resp_str: uintptr_t); pub fn __dtrace_probe_rx(mp: uintptr_t); pub fn __dtrace_probe_tx(mp: uintptr_t); + + pub fn __dtrace_probe_worker__start(n_pkts: uintptr_t); + pub fn __dtrace_probe_worker__end(); + + pub fn __dtrace_probe_worker__pkt__start(dir: uintptr_t); + pub fn __dtrace_probe_worker__pkt__end(); + + pub fn __dtrace_probe_worker__no__space(dir: uintptr_t, n_pkts: uintptr_t); } fn bad_packet_parse_probe( @@ -226,6 +238,9 @@ struct XdeState { vpc_map: Arc, v2b: Arc, underlay: KMutex>, + + deliver: Arc>, + deliver_hdl: Option, } struct UnderlayState { @@ -248,11 +263,21 @@ fn get_xde_state() -> &'static XdeState { impl XdeState { fn new() -> Self { let ectx = Arc::new(ExecCtx { log: Box::new(opte::KernelLog {}) }); + + // Completely arbitrary watermark lmao. + let deliver = Arc::new(EssQueue::new(Some(NonZeroUsize::new(usize::MAX).unwrap()))); + let worker_mailbox = deliver.clone(); + + let deliver_hdl = Some(spawn(|| xde_worker(worker_mailbox))); + XdeState { underlay: KMutex::new(None, KMutexType::Driver), ectx, vpc_map: Arc::new(overlay::VpcMappings::new()), v2b: Arc::new(overlay::Virt2Boundary::new()), + + deliver, + deliver_hdl, } } } @@ -1254,7 +1279,13 @@ unsafe extern "C" fn xde_detach( // Reattach the XdeState to a Box, which takes ownership and will // free it on drop. - drop(Box::from_raw(state)); + let mut xde = Box::from_raw(state); + + // End the worker thread. + xde.deliver.quiesce(); + xde.deliver_hdl.take().unwrap().join(); + + drop(xde); // Remove control device ddi_remove_minor_node(xde_dip, XDE_STR); @@ -1530,8 +1561,19 @@ unsafe extern "C" fn xde_mc_tx( // by the mch they're being targeted to. E.g., either build a list // of chains (u1, u2, port0, port1, ...), or hold tx until another // packet breaks the run targeting the same dest. - while let Some(pkt) = chain.pop_front() { - xde_mc_tx_one(src_dev, pkt); + // while let Some(pkt) = chain.pop_front() { + // xde_mc_tx_one(src_dev, pkt); + // } + + let n_pkts = chain.len(); + + if let Err(_) = get_xde_state() + .deliver + .deliver(chain, || Origin::Port(src_dev.devname.clone())) + { + unsafe { + __dtrace_probe_worker__no__space(Direction::Out as _, n_pkts); + } } ptr::null_mut() @@ -1678,6 +1720,85 @@ unsafe fn xde_mc_tx_one( ptr::null_mut() } +// Doing it this wey for now because passing Arcs will really +// complicate the detach flow + safety (e.g., Arcs to ports/ulays +// outliving the quiesced callbacks)... +// Strings suck but this is a POC, so w/e. +enum Origin { + Underlay(usize), + Port(String), +} + +// TODO: put somewhere sane + +pub const CPU_BEST: c_int = -4; + +extern "C" { + pub fn affinity_set(affinity: c_int); +} + +fn xde_worker(mailbox: Arc>) { + opte::engine::err!("XDE WORKER STARTED"); + + unsafe { affinity_set(CPU_BEST) }; + + while let Ok(packets) = mailbox.receive() { + unsafe { + __dtrace_probe_worker__start(packets.len()); + } + + let devs = unsafe { xde_devs.read() }; + let xde = get_xde_state(); + let ulay = xde.underlay.lock(); + + // TODO: hold packets out here for batched dispatch. + for (packet, origin) in packets { + let dir = if let Origin::Underlay(_) = &origin { + Direction::In + } else { + Direction::Out + }; + unsafe { + __dtrace_probe_worker__pkt__start(dir as _); + } + + match origin { + Origin::Port(port_name) => { + let my_port = + devs.iter().find(|port| port.devname == port_name); + + if let Some(port) = my_port { + unsafe { + xde_mc_tx_one(port, packet); + } + } + } + Origin::Underlay(0) => unsafe { + if let Some(ulay) = &*ulay { + xde_rx_one(&ulay.u1.mch, ptr::null_mut(), packet); + } + }, + Origin::Underlay(1) => unsafe { + if let Some(ulay) = &*ulay { + xde_rx_one(&ulay.u2.mch, ptr::null_mut(), packet); + } + }, + Origin::Underlay(n) => panic!("I don't have an {n}th underlay"), + }; + + unsafe { + __dtrace_probe_worker__pkt__end(); + } + } + + unsafe { + __dtrace_probe_worker__end(); + } + } + + opte::engine::err!("XDE WORKER EXITED"); +} + /// This is a generic wrapper for references that should be dropped once not in /// use. pub(crate) struct DropRef @@ -1832,8 +1953,20 @@ unsafe extern "C" fn xde_rx( // by the mch they're being targeted to. E.g., either build a list // of chains (port0, port1, ...), or hold tx until another // packet breaks the run targeting the same dest. - while let Some(pkt) = chain.pop_front() { - xde_rx_one(&mch, mrh, pkt); + // while let Some(pkt) = chain.pop_front() { + // xde_rx_one(&mch, mrh, pkt); + // } + + let n_pkts = chain.len(); + + // Eh... The only reason I can get away with this is because we never + // Hairpin anything on the underlay, for good reason lmao. + if let Err(_) = + get_xde_state().deliver.deliver(chain, || Origin::Underlay(0)) + { + unsafe { + __dtrace_probe_worker__no__space(Direction::In as _, n_pkts); + } } } From b3e4c84d5defd2a5d80b07ab1ad52285473fb21d Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Thu, 13 Jun 2024 10:32:32 +0100 Subject: [PATCH 4/5] Less contention, still absurdly slow packet dropoff. --- lib/opte/src/ddi/sync.rs | 17 ++++---- lib/opte/src/engine/packet.rs | 79 ++++++++++++++++++++++++++++++----- xde/src/xde.rs | 8 ++-- 3 files changed, 82 insertions(+), 22 deletions(-) diff --git a/lib/opte/src/ddi/sync.rs b/lib/opte/src/ddi/sync.rs index 7958dadf..5246c1fd 100644 --- a/lib/opte/src/ddi/sync.rs +++ b/lib/opte/src/ddi/sync.rs @@ -140,11 +140,9 @@ impl KMutex { } pub fn try_lock(&self) -> Result, LockTaken> { - let try_lock = unsafe { - mutex_tryenter(self.mutex.0.get()) - }; + let try_lock = unsafe { mutex_tryenter(self.mutex.0.get()) }; if try_lock != 0 { - KMutexGuard { lock: self } + Ok(KMutexGuard { lock: self }) } else { Err(LockTaken) } @@ -231,10 +229,13 @@ impl KMutex { } pub fn try_lock(&self) -> Result, LockTaken> { - self.inner.try_lock().map_err(|err| match err { - std::sync::TryLockError::Poisoned(_) => panic!("oops"), - std::sync::TryLockError::WouldBlock => LockTaken, - }) + self.inner + .try_lock() + .map_err(|err| match err { + std::sync::TryLockError::Poisoned(_) => panic!("oops"), + std::sync::TryLockError::WouldBlock => LockTaken, + }) + .map(|guard| KMutexGuard { guard }) } } diff --git a/lib/opte/src/engine/packet.rs b/lib/opte/src/engine/packet.rs index 2ff25965..65eccf36 100644 --- a/lib/opte/src/engine/packet.rs +++ b/lib/opte/src/engine/packet.rs @@ -55,6 +55,7 @@ use core::ptr::NonNull; use core::result; use core::slice; use core::sync::atomic::AtomicBool; +use core::sync::atomic::AtomicUsize; use crc32fast::Hasher; use dyn_clone::DynClone; use serde::Deserialize; @@ -614,6 +615,15 @@ impl PacketChainAnd { pub fn len(&self) -> usize { self.packets.len() } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn extend(&mut self, mut other: Self) { + self.packets.extend(other.packets); + self.metadata.append(&mut other.metadata); + } } impl Default for PacketChainAnd { @@ -632,21 +642,43 @@ impl Iterator for PacketChainAnd { } } +// to test: double-buffer inner with shared tick? + +struct WriteSlot { + inner: KMutex>, + pkts: AtomicUsize, +} + +impl Default for WriteSlot { + fn default() -> Self { + Self { + inner: KMutex::new( + Default::default(), + crate::ddi::sync::KMutexType::Spin, + ), + pkts: 0.into(), + } + } +} + /// Not quite an SQueue. -// TODO: work in signalling/condvar pub struct EssQueue { - inner: KMutex>, + slots: [WriteSlot; 2], + sleepy: KMutex<()>, cv: KCondvar, watermark: Option, kill: AtomicBool, } +// NOTE: this is a really dumb proxy for per-port. + impl EssQueue { pub fn new(watermark: Option) -> Self { Self { - inner: KMutex::new( + slots: Default::default(), + sleepy: KMutex::new( Default::default(), - crate::ddi::sync::KMutexType::Driver, + crate::ddi::sync::KMutexType::Spin, ), cv: KCondvar::new(), watermark, @@ -657,23 +689,34 @@ impl EssQueue { // TODO: want in future to maybe have T be derived from the packet (e.g., parsed). pub fn deliver( &self, + slot: usize, packets: PacketChain, mut f: impl FnMut() -> T, ) -> Result<(), EssQueueDeliverError> { + let n = packets.len(); // pre-prepare list of elements to push at back. - let mut els: LinkedList = (0..packets.len()).map(|_| f()).collect(); + let mut els: LinkedList = (0..n).map(|_| f()).collect(); - let mut workspace = self.inner.lock(); + let mut workspace = self.slots[slot].inner.lock(); - if self.watermark.map(|w| workspace.packets.len() > w.into()).unwrap_or(false) { + if self + .watermark + .map(|w| workspace.packets.len() > w.into()) + .unwrap_or(false) + { return Err(EssQueueDeliverError::Full); } workspace.packets.extend(packets); workspace.metadata.append(&mut els); + self.slots[slot] + .pkts + .fetch_add(n, core::sync::atomic::Ordering::Relaxed); + // We can notify with/without the lock, but illumos prefers // we do so with the lock 'for scheduling purposes'. + drop(workspace); self.cv.notify_one(); Ok(()) @@ -681,17 +724,31 @@ impl EssQueue { // Probably want there to be a timeout here rather than just a kill flag. pub fn receive(&self) -> Result, EssQueueReceiveError> { - let mut workspace = self.inner.lock(); + let mut pkts = PacketChainAnd::default(); - while workspace.packets.is_empty() { + while pkts.is_empty() { if self.kill.load(core::sync::atomic::Ordering::Relaxed) { return Err(EssQueueReceiveError::Dead); } - workspace = self.cv.wait(workspace); + for slot in &self.slots { + if slot.pkts.load(core::sync::atomic::Ordering::Relaxed) == 0 { + continue; + } + let mut workspace = slot.inner.lock(); + + let taken = core::mem::take(&mut *workspace); + slot.pkts.store(0, core::sync::atomic::Ordering::Relaxed); + drop(workspace); + + pkts.extend(taken); + } + + let a = self.sleepy.lock(); + self.cv.wait(a); } - Ok(core::mem::take(&mut workspace)) + Ok(pkts) } pub fn quiesce(&self) { diff --git a/xde/src/xde.rs b/xde/src/xde.rs index 27ee60dd..8adcf79b 100644 --- a/xde/src/xde.rs +++ b/xde/src/xde.rs @@ -265,7 +265,9 @@ impl XdeState { let ectx = Arc::new(ExecCtx { log: Box::new(opte::KernelLog {}) }); // Completely arbitrary watermark lmao. - let deliver = Arc::new(EssQueue::new(Some(NonZeroUsize::new(usize::MAX).unwrap()))); + let deliver = Arc::new(EssQueue::new(Some( + NonZeroUsize::new(usize::MAX).unwrap(), + ))); let worker_mailbox = deliver.clone(); let deliver_hdl = Some(spawn(|| xde_worker(worker_mailbox))); @@ -1569,7 +1571,7 @@ unsafe extern "C" fn xde_mc_tx( if let Err(_) = get_xde_state() .deliver - .deliver(chain, || Origin::Port(src_dev.devname.clone())) + .deliver(1, chain, || Origin::Port(src_dev.devname.clone())) { unsafe { __dtrace_probe_worker__no__space(Direction::Out as _, n_pkts); @@ -1962,7 +1964,7 @@ unsafe extern "C" fn xde_rx( // Eh... The only reason I can get away with this is because we never // Hairpin anything on the underlay, for good reason lmao. if let Err(_) = - get_xde_state().deliver.deliver(chain, || Origin::Underlay(0)) + get_xde_state().deliver.deliver(0, chain, || Origin::Underlay(0)) { unsafe { __dtrace_probe_worker__no__space(Direction::In as _, n_pkts); From de1ff57de390578becd6c0be4c47a398c03b5386 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Fri, 14 Jun 2024 12:14:56 +0100 Subject: [PATCH 5/5] Fixup kbench, stick thread into poll for 'fun' --- bench/src/kbench/measurement.rs | 5 ++-- dtrace/flow.d | 16 +++++++++++++ dtrace/opte-count-cycles-oneliner.d | 1 + dtrace/opte-count-cycles-os.d | 1 + dtrace/opte-count-cycles.d | 36 +++++++++++++++++++++++++++++ dtrace/opte-tcp-flowdrop.d | 31 +++++++++++++++++++++++++ lib/opte/src/engine/packet.rs | 5 ++-- 7 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 dtrace/flow.d create mode 100644 dtrace/opte-count-cycles-oneliner.d create mode 100644 dtrace/opte-count-cycles-os.d create mode 100644 dtrace/opte-tcp-flowdrop.d diff --git a/bench/src/kbench/measurement.rs b/bench/src/kbench/measurement.rs index 6ea55e83..b0482344 100644 --- a/bench/src/kbench/measurement.rs +++ b/bench/src/kbench/measurement.rs @@ -182,8 +182,9 @@ pub fn build_flamegraph( } let terms = [ - ("xde_rx", rx_name.unwrap_or("rx")), - ("xde_mc_tx", tx_name.unwrap_or("tx")), + ("xde_rx", rx_name.unwrap_or("in_place")), + ("xde_mc_tx", tx_name.unwrap_or("out_place")), + ("xde_worker", "process"), ]; for (tracked_fn, out_name) in terms { diff --git a/dtrace/flow.d b/dtrace/flow.d new file mode 100644 index 00000000..7754605b --- /dev/null +++ b/dtrace/flow.d @@ -0,0 +1,16 @@ +mac_client_set_flow_cb:entry { + printf("entry: mip %p mrh %p mp %p", + arg0, arg1, arg2); +} + +mac_client_set_flow_cb:return { + printf("donezo off %p val %p", arg0, arg1); +} + +flow_transport_lport_match:entry { + printf("entry: mip %p mrh %p mp %p", arg0, arg1, arg2); +} + +flow_transport_lport_match:return { + printf("donezo off %p val %p", arg0, arg1); +} diff --git a/dtrace/opte-count-cycles-oneliner.d b/dtrace/opte-count-cycles-oneliner.d new file mode 100644 index 00000000..e386e4b7 --- /dev/null +++ b/dtrace/opte-count-cycles-oneliner.d @@ -0,0 +1 @@ +worker-pkt-start { self->ts = vtimestamp; self->dir = arg0; } worker-pkt-end /self->dir == 1 && self->ts/ { @time["rx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0;} worker-pkt-end /self->dir == 2 && self->ts/ {@time["tx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0;} END {} \ No newline at end of file diff --git a/dtrace/opte-count-cycles-os.d b/dtrace/opte-count-cycles-os.d new file mode 100644 index 00000000..189fc61f --- /dev/null +++ b/dtrace/opte-count-cycles-os.d @@ -0,0 +1 @@ +xde_rx:entry { self->ts = vtimestamp; self->dir = 1; } xde_mc_tx:entry { self->ts = vtimestamp; self->dir = 2; } xde_rx:return /self->ts/ { @time["rx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0;} xde_mc_tx:return /self->ts/ {@time["tx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0;} END {} \ No newline at end of file diff --git a/dtrace/opte-count-cycles.d b/dtrace/opte-count-cycles.d index cfb5ae5b..ab271796 100644 --- a/dtrace/opte-count-cycles.d +++ b/dtrace/opte-count-cycles.d @@ -16,6 +16,42 @@ worker-pkt-end { self->dir = 0; } +xde_rx:entry { + self->drop_time = vtimestamp; +} + +xde_mc_tx:entry { + self->drop_time = vtimestamp; +} + +xde_rx:return /self->dir/ { + @time["place_in_inner"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); + self->drop_time = 0; +} + +xde_mc_tx:return /self->dir/ { + @time["place_out_inner"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); + self->drop_time = 0; +} + +xde_rx:return /!self->dir/ { + @time["place_in"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); + self->drop_time = 0; +} + +xde_mc_tx:return /!self->dir/ { + @time["place_out"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); + self->drop_time = 0; +} + +xde_rx:return { + self->drop_time = 0; +} + +xde_mc_tx:return { + self->drop_time = 0; +} + END { } \ No newline at end of file diff --git a/dtrace/opte-tcp-flowdrop.d b/dtrace/opte-tcp-flowdrop.d new file mode 100644 index 00000000..8b430548 --- /dev/null +++ b/dtrace/opte-tcp-flowdrop.d @@ -0,0 +1,31 @@ +/* + * Track bad packets as they happen. + * + * dtrace -L ./lib -I . -Cqs ./opte-bad-packet.d + */ +#include "common.h" + +#define HDR_FMT "%-12s %-3s %-18s %s\n" +#define LINE_FMT "%-12s %-3s 0x%-16p %s\n" + +BEGIN { + printf(HDR_FMT, "PORT", "DIR", "MBLK", "MSG"); + num = 0; +} + +tcp-err { + this->dir = DIR_STR(arg0); + this->port = stringof(arg1); + this->flow_id = stringof(arg2); + this->mblk = arg3; + this->msg = stringof(arg4); + + if (num >= 10) { + printf(HDR_FMT, "PORT", "DIR", "MBLK", "MSG"); + num = 0; + } + + printf(LINE_FMT, this->port, this->dir, this->mblk, this->msg); + stack(); + num++; +} diff --git a/lib/opte/src/engine/packet.rs b/lib/opte/src/engine/packet.rs index 65eccf36..7083d922 100644 --- a/lib/opte/src/engine/packet.rs +++ b/lib/opte/src/engine/packet.rs @@ -55,6 +55,7 @@ use core::ptr::NonNull; use core::result; use core::slice; use core::sync::atomic::AtomicBool; +use core::sync::atomic::AtomicU64; use core::sync::atomic::AtomicUsize; use crc32fast::Hasher; use dyn_clone::DynClone; @@ -744,8 +745,8 @@ impl EssQueue { pkts.extend(taken); } - let a = self.sleepy.lock(); - self.cv.wait(a); + // let a = self.sleepy.lock(); + // self.cv.wait(a); } Ok(pkts)