From 6a64a519484cf6cb93301ab766258d7515ff192a Mon Sep 17 00:00:00 2001 From: Patrick Mooney Date: Mon, 18 Sep 2023 18:16:57 -0500 Subject: [PATCH] Overhaul block device interfaces This reshapes how block devices (frontends) interact with their attached backends. Rather than associating a backend with a device when it is created, backend and device are attached after the fact, when each has had an opportunity to be initialized. Through its attachment handle, the device is able to notify the backend of new requests, but also query sizing parameters (previously specified during device construction), pause and unpause request retrieval, and (in the future) perform other tasks such as cache mode alteration. The backend has a corresponding attachment handle through which it retrieves pending requests from the device -- behavior which is largely unchanged from the original structure. Rather than store device-specific information required to issue request completions to the guest, the device emulation is expecting to use a `Tracking` structure which will store the completion data to be retrieved using an ID injected into the Request, which is passed back with its result when processed by the backend. This tracking structure also implements several generic USDT probes for tracing block events, rather than requiring the use of per-device probes. --- Cargo.lock | 1 + Cargo.toml | 1 + bin/propolis-server/src/lib/config.rs | 59 -- bin/propolis-server/src/lib/initializer.rs | 13 +- bin/propolis-standalone/src/cidata.rs | 1 + bin/propolis-standalone/src/config.rs | 1 - bin/propolis-standalone/src/main.rs | 10 +- lib/propolis/Cargo.toml | 1 + lib/propolis/src/block/backend.rs | 231 ++++++ lib/propolis/src/block/crucible.rs | 161 ++-- lib/propolis/src/block/device.rs | 355 ++++++++ lib/propolis/src/block/file.rs | 189 +++-- lib/propolis/src/block/in_memory.rs | 158 ++-- lib/propolis/src/block/mem_async.rs | 197 ++--- lib/propolis/src/block/mod.rs | 906 +++------------------ lib/propolis/src/hw/nvme/cmds.rs | 4 + lib/propolis/src/hw/nvme/mod.rs | 107 +-- lib/propolis/src/hw/nvme/requests.rs | 65 +- lib/propolis/src/hw/virtio/block.rs | 77 +- 19 files changed, 1210 insertions(+), 1327 deletions(-) create mode 100644 lib/propolis/src/block/backend.rs create mode 100644 lib/propolis/src/block/device.rs diff --git a/Cargo.lock b/Cargo.lock index 6a84f6795..e3ad453f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3049,6 +3049,7 @@ dependencies = [ "num_enum 0.5.11", "oximeter", "p9ds 0.1.0 (git+https://github.com/oxidecomputer/p9fs)", + "pin-project-lite", "propolis_types", "rand", "rfb", diff --git a/Cargo.toml b/Cargo.toml index 4221e3c10..149ab28e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,6 +116,7 @@ lazy_static = "1.4" libc = "0.2" mockall = "0.11" num_enum = "0.5.11" +pin-project-lite = "0.2.13" proc-macro2 = "1.0" progenitor = { git = "https://github.com/oxidecomputer/progenitor", branch = "main" } quote = "1.0" diff --git a/bin/propolis-server/src/lib/config.rs b/bin/propolis-server/src/lib/config.rs index 5c2fb69e1..20dd6e12c 100644 --- a/bin/propolis-server/src/lib/config.rs +++ b/bin/propolis-server/src/lib/config.rs @@ -4,67 +4,8 @@ //! Describes a server config which may be parsed from a TOML file. -use std::num::NonZeroUsize; -use std::sync::Arc; - -use propolis::block; -use propolis::inventory; pub use propolis_server_config::*; -pub fn create_backend_for_block( - config: &Config, - name: &str, - log: slog::Logger, -) -> Result<(Arc, inventory::ChildRegister), ParseError> { - let entry = config.block_devs.get(name).ok_or_else(|| { - ParseError::KeyNotFound(name.to_string(), "block_dev".to_string()) - })?; - blockdev_backend(entry, log) -} - -fn blockdev_backend( - dev: &BlockDevice, - log: slog::Logger, -) -> Result<(Arc, inventory::ChildRegister), ParseError> { - let opts = propolis::block::BackendOpts { - block_size: dev.opts.block_size, - read_only: dev.opts.read_only, - skip_flush: dev.opts.skip_flush, - }; - match &dev.bdtype as &str { - "file" => { - let path = dev - .options - .get("path") - .ok_or_else(|| { - ParseError::KeyNotFound( - "path".to_string(), - "options".to_string(), - ) - })? - .as_str() - .ok_or_else(|| { - ParseError::AsError( - "path".to_string(), - "as_str".to_string(), - ) - })?; - - let nworkers = NonZeroUsize::new(8).unwrap(); - let be = propolis::block::FileBackend::create( - path, opts, nworkers, log, - )?; - let child = - inventory::ChildRegister::new(&be, Some(path.to_string())); - - Ok((be, child)) - } - _ => { - panic!("unrecognized block dev type {}!", dev.bdtype); - } - } -} - #[cfg(not(feature = "omicron-build"))] pub fn reservoir_decide(log: &slog::Logger) -> bool { // Automatically enable use of the memory reservoir (rather than transient diff --git a/bin/propolis-server/src/lib/initializer.rs b/bin/propolis-server/src/lib/initializer.rs index ed8694334..5e90c9c47 100644 --- a/bin/propolis-server/src/lib/initializer.rs +++ b/bin/propolis-server/src/lib/initializer.rs @@ -335,9 +335,6 @@ impl<'a> MachineInitializer<'a> { ..Default::default() }, nworkers, - self.log.new( - slog::o!("component" => format!("file-{}", spec.path)), - ), )?; let child = @@ -363,6 +360,7 @@ impl<'a> MachineInitializer<'a> { info!(self.log, "Creating in-memory disk backend"; "len" => bytes.len()); + let nworkers = NonZeroUsize::new(8).unwrap(); let be = propolis::block::InMemoryBackend::create( bytes, propolis::block::BackendOpts { @@ -370,6 +368,7 @@ impl<'a> MachineInitializer<'a> { read_only: Some(spec.readonly), ..Default::default() }, + nworkers, )?; let child = inventory::ChildRegister::new( @@ -447,20 +446,18 @@ impl<'a> MachineInitializer<'a> { ) })?; - let be_info = backend.info(); match device_interface { DeviceInterface::Virtio => { - let vioblk = virtio::PciVirtioBlock::new(0x100, be_info); + let vioblk = virtio::PciVirtioBlock::new(0x100); let id = self.inv.register_instance(&vioblk, bdf.to_string())?; let _ = self.inv.register_child(child, id).unwrap(); - backend.attach(vioblk.clone())?; + block::attach(backend, vioblk.clone()); chipset.device().pci_attach(bdf, vioblk); } DeviceInterface::Nvme => { let nvme = nvme::PciNvme::create( name.to_string(), - be_info, self.log.new( slog::o!("component" => format!("nvme-{}", name)), ), @@ -468,7 +465,7 @@ impl<'a> MachineInitializer<'a> { let id = self.inv.register_instance(&nvme, bdf.to_string())?; let _ = self.inv.register_child(child, id).unwrap(); - backend.attach(nvme.clone())?; + block::attach(backend, nvme.clone()); chipset.device().pci_attach(bdf, nvme); } }; diff --git a/bin/propolis-standalone/src/cidata.rs b/bin/propolis-standalone/src/cidata.rs index b4723e3c5..f24a2657e 100644 --- a/bin/propolis-standalone/src/cidata.rs +++ b/bin/propolis-standalone/src/cidata.rs @@ -98,6 +98,7 @@ pub(crate) fn build_cidata_be( read_only: Some(true), ..Default::default() }, + std::num::NonZeroUsize::new(8).unwrap(), ) .context("could not create block backend") } diff --git a/bin/propolis-standalone/src/config.rs b/bin/propolis-standalone/src/config.rs index a4246cfd9..86427f992 100644 --- a/bin/propolis-standalone/src/config.rs +++ b/bin/propolis-standalone/src/config.rs @@ -65,7 +65,6 @@ pub fn block_backend( parsed.workers.unwrap_or(DEFAULT_WORKER_COUNT), ) .unwrap(), - log.clone(), ) .unwrap(); diff --git a/bin/propolis-standalone/src/main.rs b/bin/propolis-standalone/src/main.rs index 36f00b9ac..7ad78b5de 100644 --- a/bin/propolis-standalone/src/main.rs +++ b/bin/propolis-standalone/src/main.rs @@ -872,12 +872,11 @@ fn setup_instance( let (backend, creg) = config::block_backend(&config, dev, log); let bdf = bdf.unwrap(); - let info = backend.info(); - let vioblk = hw::virtio::PciVirtioBlock::new(0x100, info); + let vioblk = hw::virtio::PciVirtioBlock::new(0x100); let id = inv.register_instance(&vioblk, bdf.to_string())?; let _be_id = inv.register_child(creg, id)?; - backend.attach(vioblk.clone() as Arc)?; + block::attach(backend, vioblk.clone()); chipset.pci_attach(bdf, vioblk); } @@ -902,14 +901,13 @@ fn setup_instance( .as_str() .unwrap() .to_string(); - let info = backend.info(); let log = log.new(slog::o!("dev" => format!("nvme-{}", name))); - let nvme = hw::nvme::PciNvme::create(dev_serial, info, log); + let nvme = hw::nvme::PciNvme::create(dev_serial, log); let id = inv.register_instance(&nvme, bdf.to_string())?; let _be_id = inv.register_child(creg, id)?; - backend.attach(nvme.clone())?; + block::attach(backend, nvme.clone()); chipset.pci_attach(bdf, nvme); } diff --git a/lib/propolis/Cargo.toml b/lib/propolis/Cargo.toml index ce9ce75df..ddb03d27a 100644 --- a/lib/propolis/Cargo.toml +++ b/lib/propolis/Cargo.toml @@ -20,6 +20,7 @@ propolis_types.workspace = true usdt = { workspace = true, features = ["asm"] } tokio = { workspace = true, features = ["full"] } futures.workspace = true +pin-project-lite.workspace = true anyhow.workspace = true rfb.workspace = true slog.workspace = true diff --git a/lib/propolis/src/block/backend.rs b/lib/propolis/src/block/backend.rs new file mode 100644 index 000000000..224d53e3e --- /dev/null +++ b/lib/propolis/src/block/backend.rs @@ -0,0 +1,231 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Mechanisms required to implement a block backend + +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Condvar, Mutex, Weak}; +use std::task::{Context, Poll}; + +use crate::accessors::MemAccessor; +use crate::block::{device, Device, Request}; + +use pin_project_lite::pin_project; +use tokio::sync::{futures::Notified, Notify}; + +/// Reason why next request is unavailable from associated device +pub enum ReqError { + /// No request is pending from the device + NonePending, + /// Processing of requests from the device is paused + Paused, + /// Backend is not attached to any device + Detached, + /// Backend is halting workers + Halted, +} + +pub(super) struct AttachState { + sibling: Weak>>, + device: Arc, + acc_mem: MemAccessor, + dev_is_paused: bool, + backend_is_halted: bool, +} +impl AttachState { + fn next_req(&self) -> Result { + if self.backend_is_halted { + // The backend being halted is the most pressing status to consider, + // so it must be checked first + Err(ReqError::Halted) + } else if self.dev_is_paused { + // Do not allow the backend to pull any requests while the device is + // in the paused state + Err(ReqError::Paused) + } else { + self.device.next().ok_or(ReqError::NonePending) + } + } + pub(super) fn new( + dev_attach: &device::Attachment, + device: &Arc, + ) -> Self { + Self { + sibling: Arc::downgrade(&dev_attach.0), + device: device.clone(), + acc_mem: device.accessor_mem(), + dev_is_paused: false, + backend_is_halted: false, + } + } + pub(super) fn set_paused(&mut self, is_paused: bool) { + self.dev_is_paused = is_paused; + } + pub(super) fn same_as_sibling( + &self, + other: &Arc>>, + ) -> bool { + self.sibling.ptr_eq(&Arc::downgrade(other)) + } +} +pub(super) struct AttachInner { + pub(super) state: Mutex>, + req_notifier: Notify, + cv: Condvar, +} +impl AttachInner { + fn new() -> Self { + Self { + state: Mutex::new(None), + req_notifier: Notify::new(), + cv: Condvar::new(), + } + } +} + +/// State held by the backend about the attached (if any) device +pub struct Attachment(pub(super) Arc); +impl Attachment { + pub fn new() -> Self { + Attachment(Arc::new(AttachInner::new())) + } + + /// Attempt to retrieve the next [`Request`] from the attached (if any) + /// device. + /// + /// Will return an error if: + /// + /// - No device is attached + /// - The device is paused + /// - The backend is halted + /// - No requests are queued in the device + pub fn next_req(&self) -> Result { + let guard = self.0.state.lock().unwrap(); + let inner = guard.as_ref().ok_or(ReqError::Detached)?; + inner.next_req() + } + + /// Block (synchronously) in order to retrieve the next [`Request`] from the + /// device. Will return [`None`] if no device is attached, or the backend + /// is halted, otherwise it will block until a request is available. + pub fn block_for_req(&self) -> Option { + let mut guard = self.0.state.lock().unwrap(); + loop { + // bail if not attached + let inner = guard.as_ref()?; + if inner.backend_is_halted { + return None; + } + + if let Ok(req) = inner.next_req() { + return Some(req); + } + + guard = self.0.cv.wait(guard).unwrap(); + } + } + + /// Wait (via a [`Future`]) to retrieve the next [`Request`] from the + /// device. Will return [`None`] if no device is attached, or the backend + /// is halted. + pub fn wait_for_req(&self) -> WaitForReq { + WaitForReq { attachment: self, wait: self.0.req_notifier.notified() } + } + + /// Run provided function against [`MemAccessor`] for this backend. + /// + /// Intended to provide caller with means of creating/associated child + /// accessors. + pub fn accessor_mem( + &self, + f: impl FnOnce(Option<&MemAccessor>) -> R, + ) -> R { + match self.0.state.lock().unwrap().as_ref() { + Some(inner) => f(Some(&inner.acc_mem)), + None => f(None), + } + } + + /// Assert halted state on Attachment + pub fn halt(&self) { + if let Some(state) = self.0.state.lock().unwrap().as_mut() { + state.backend_is_halted = true; + } + self.notify(); + } + + /// Clear halted state from Attachment + pub fn start(&self) { + if let Some(state) = self.0.state.lock().unwrap().as_mut() { + state.backend_is_halted = false; + } + self.notify(); + } + + /// Detach from the associated (if any) device. + pub fn detach(&self) -> Option<()> { + // lock ordering demands we approach this from the device side + let be_lock = self.0.state.lock().unwrap(); + let be_inner = be_lock.as_ref()?; + let dev_inner = be_inner.sibling.upgrade()?; + drop(be_lock); + + device::AttachInner::detach(&dev_inner) + } + + /// Notify any [blocked](Self::block_for_req()) or + /// [waiting](Self::wait_for_req()) tasks of a state change. This could be + /// a change to the device, to the backend, or simply new request(s) + /// available. + pub fn notify(&self) { + // TODO: avoid thundering herd? + self.0.req_notifier.notify_waiters(); + self.0.cv.notify_all(); + } +} + +pin_project! { + /// Future returned from [`Attachment::wait_for_req()`] + /// + /// This future is not fused, so it can be repeatedly polled for additional + /// [`Request`]s as they become available. + pub struct WaitForReq<'a> { + attachment: &'a Attachment, + #[pin] + wait: Notified<'a>, + } +} +impl Future for WaitForReq<'_> { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + match this.attachment.next_req() { + Ok(req) => return Poll::Ready(Some(req)), + Err(ReqError::Detached) | Err(ReqError::Halted) => { + // Let the consumer know that they should bail + return Poll::Ready(None); + } + Err(ReqError::NonePending) | Err(ReqError::Paused) => { + if let Poll::Ready(_) = + Notified::poll(this.wait.as_mut(), cx) + { + // The `Notified` future is fused, so we must "refresh" + // prior to any subsequent attempts to poll it after it + // emits `Ready` + this.wait + .set(this.attachment.0.req_notifier.notified()); + + // Take another lap if woken by the notifier to check + // for a pending request + continue; + } + return Poll::Pending; + } + } + } + } +} diff --git a/lib/propolis/src/block/crucible.rs b/lib/propolis/src/block/crucible.rs index d3c603584..4d7f7f0dc 100644 --- a/lib/propolis/src/block/crucible.rs +++ b/lib/propolis/src/block/crucible.rs @@ -7,8 +7,8 @@ use std::io; use std::sync::Arc; -use super::DeviceInfo; -use crate::block; +use crate::accessors::MemAccessor; +use crate::block::{self, DeviceInfo}; use crate::inventory::Entity; use crate::vmm::MemCtx; @@ -23,12 +23,50 @@ pub use nexus_client::Client as NexusClient; pub struct CrucibleBackend { tokio_rt: tokio::runtime::Handle, - block_io: Arc, - scheduler: block::Scheduler, - + state: Arc, +} +struct WorkerState { + attachment: block::backend::Attachment, + volume: Volume, info: block::DeviceInfo, skip_flush: bool, } +impl WorkerState { + async fn process_loop(&self, acc_mem: MemAccessor) { + let read_only = self.info.read_only; + let skip_flush = self.skip_flush; + loop { + let req = match self.attachment.wait_for_req().await { + Some(r) => r, + None => { + // bail + break; + } + }; + let res = if let Some(memctx) = acc_mem.access() { + match process_request( + &self.volume, + read_only, + skip_flush, + &req, + &memctx, + ) + .await + { + Ok(_) => block::Result::Success, + Err(e) => { + let mapped = block::Result::from(e); + assert!(mapped.is_err()); + mapped + } + } + } else { + block::Result::Failure + }; + req.complete(res); + } + } +} impl CrucibleBackend { pub fn create( @@ -112,15 +150,16 @@ impl CrucibleBackend { Ok(Arc::new(Self { tokio_rt: tokio::runtime::Handle::current(), - block_io: Arc::new(volume), - scheduler: block::Scheduler::new(), - - info: block::DeviceInfo { - block_size: block_size as u32, - total_size: sectors, - read_only: opts.read_only.unwrap_or(false), - }, - skip_flush: opts.skip_flush.unwrap_or(false), + state: Arc::new(WorkerState { + attachment: block::backend::Attachment::new(), + volume, + info: block::DeviceInfo { + block_size: block_size as u32, + total_size: sectors, + read_only: opts.read_only.unwrap_or(false), + }, + skip_flush: opts.skip_flush.unwrap_or(false), + }), })) } @@ -154,13 +193,14 @@ impl CrucibleBackend { /// Retrieve the UUID identifying this Crucible backend. pub fn get_uuid(&self) -> io::Result { let rt = tokio::runtime::Handle::current(); - rt.block_on(async { self.block_io.get_uuid().await }) + rt.block_on(async { self.state.volume.get_uuid().await }) .map_err(CrucibleError::into) } /// Issue a snapshot request pub async fn snapshot(&self, snapshot_id: Uuid) -> io::Result<()> { - self.block_io + self.state + .volume .flush(Some(SnapshotDetails { snapshot_name: snapshot_id.to_string(), })) @@ -176,58 +216,37 @@ impl CrucibleBackend { ) -> io::Result<()> { let old_vcr = serde_json::from_str(old_vcr_json)?; let new_vcr = serde_json::from_str(new_vcr_json)?; - self.block_io + self.state + .volume .target_replace(old_vcr, new_vcr) .await .map_err(CrucibleError::into) } -} - -impl block::Backend for CrucibleBackend { - fn info(&self) -> DeviceInfo { - self.info - } - - fn process(&self, _req: &block::Request, _mem: &MemCtx) -> block::Result { - panic!("request dispatch expected to be done through async logic"); - } - - fn attach(&self, dev: Arc) -> io::Result<()> { - self.scheduler.attach(dev); + fn spawn_workers(&self) { // TODO: make this tunable? let worker_count = 8; - for _n in 0..worker_count { - let bdev = self.block_io.clone(); - let read_only = self.info.read_only; - let skip_flush = self.skip_flush; - let mut worker = self.scheduler.worker(); - tokio::spawn(async move { - loop { - let res = match worker.next().await { - None => break, - Some((req, mguard)) => { - match process_request( - bdev.as_ref(), - read_only, - skip_flush, - req, - &mguard, - ) - .await - { - Ok(_) => block::Result::Success, - Err(_) => block::Result::Failure, - } - } - }; - worker.complete(res); - } + for n in 0..worker_count { + let worker_state = self.state.clone(); + let worker_acc = self.state.attachment.accessor_mem(|acc_mem| { + acc_mem + .expect("backend is attached") + .child(Some(format!("crucible worker {n}"))) }); + tokio::spawn( + async move { worker_state.process_loop(worker_acc).await }, + ); } + } +} - Ok(()) +impl block::Backend for CrucibleBackend { + fn attachment(&self) -> &block::backend::Attachment { + &self.state.attachment + } + fn info(&self) -> DeviceInfo { + self.state.info } } @@ -237,19 +256,15 @@ impl Entity for CrucibleBackend { } fn start(&self) -> anyhow::Result<()> { self.tokio_rt - .block_on(async move { self.block_io.activate().await })?; + .block_on(async move { self.state.volume.activate().await })?; + + self.state.attachment.start(); + self.spawn_workers(); - self.scheduler.start(); Ok(()) } - fn pause(&self) { - self.scheduler.pause(); - } - fn resume(&self) { - self.scheduler.resume(); - } fn halt(&self) { - self.scheduler.halt(); + self.state.attachment.halt(); } } @@ -269,6 +284,14 @@ pub enum Error { #[error("Crucible Error: {0}")] Crucible(#[from] CrucibleError), } +impl From for block::Result { + fn from(value: Error) -> Self { + match value { + Error::ReadOnly => block::Result::ReadOnly, + _ => block::Result::Failure, + } + } +} async fn process_request( block: &(dyn BlockIO + Send + Sync), @@ -278,11 +301,10 @@ async fn process_request( mem: &MemCtx, ) -> Result<(), Error> { match req.oper() { - block::Operation::Read(off) => { + block::Operation::Read(off, len) => { let maps = req.mappings(mem).ok_or_else(|| Error::BadGuestRegion)?; - let len = req.len(); let offset = block.byte_offset_to_block(off as u64).await?; // Perform one large read from crucible, and write from data into @@ -302,7 +324,7 @@ async fn process_request( return Err(Error::CopyError(nwritten, len)); } } - block::Operation::Write(off) => { + block::Operation::Write(off, len) => { if read_only { return Err(Error::ReadOnly); } @@ -311,7 +333,6 @@ async fn process_request( // to crucible let maps = req.mappings(mem).ok_or_else(|| Error::BadGuestRegion)?; - let len = req.len(); let mut vec: Vec = vec![0; len]; let mut nread = 0; for mapping in maps { diff --git a/lib/propolis/src/block/device.rs b/lib/propolis/src/block/device.rs new file mode 100644 index 000000000..f763202d8 --- /dev/null +++ b/lib/propolis/src/block/device.rs @@ -0,0 +1,355 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Mechanisms required to implement a block device (frontend) + +use std::collections::BTreeMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, Weak}; +use std::task::{Context, Poll, Waker}; +use std::time::Instant; + +use crate::block::{ + self, backend, probes, Backend, CacheMode, Device, DeviceInfo, Operation, + ReqId, Request, +}; + +pub(super) struct AttachInner { + sibling: Weak, + backend: Arc, + paused: bool, +} +impl AttachInner { + pub(super) fn detach(dev: &Arc>>) -> Option<()> { + let mut dev_lock = dev.lock().unwrap(); + let dev_inner = dev_lock.as_ref()?; + + let be_inner = dev_inner.sibling.upgrade()?; + let mut be_lock = be_inner.state.lock().unwrap(); + let be_state = be_lock.as_ref()?; + + assert!(be_state.same_as_sibling(dev)); + *dev_lock = None; + *be_lock = None; + + Some(()) + } + fn lock_sibling(&self, f: impl FnOnce(&mut backend::AttachState)) { + let sibling = self + .sibling + .upgrade() + .expect("Device sibling should be present when attached"); + + let mut guard = sibling.state.lock().unwrap(); + let sibling_state = guard.as_mut().expect( + "Backend sibling should be present while device is attached", + ); + f(sibling_state) + } + pub(super) fn new( + be_attach: &backend::Attachment, + backend: &Arc, + ) -> Self { + Self { + sibling: Arc::downgrade(&be_attach.0), + backend: backend.clone(), + paused: false, + } + } +} + +/// State held by the device about the attached (if any) backend +pub struct Attachment(pub(super) Arc>>); +impl Attachment { + pub fn new() -> Self { + Attachment(Arc::new(Mutex::new(None))) + } + + /// Query [`DeviceInfo`] from associated backend (if attached) + pub fn info(&self) -> Option { + self.0.lock().unwrap().as_ref().map(|inner| inner.backend.info()) + } + + /// Set cache mode on associated backend + /// + /// # Warning + /// + /// This is currently unimplemented! + pub fn set_cache_mode(&self, _mode: CacheMode) { + todo!("wire up cache mode toggling") + } + + /// Notify attached backend of (new) pending requests + pub fn notify(&self) { + let guard = self.0.lock().unwrap(); + if let Some(inner) = guard.as_ref() { + if !inner.paused { + let be = inner.backend.clone(); + drop(guard); + be.attachment().notify(); + } + } + } + + /// Pause request processing for this device. + /// + /// Backend (if attached) will not be able to retrieving any requests from + /// this device while paused. The completions for any requests in flight, + /// however, will be able to flow through. + pub fn pause(&self) { + let mut guard = self.0.lock().unwrap(); + if let Some(inner) = guard.as_mut() { + inner.paused = true; + inner.lock_sibling(|sib| { + sib.set_paused(true); + }); + } + } + + /// Clear the paused state on this device, allowing the backend (if + /// attached) to retrieve requests once again. + pub fn resume(&self) { + let mut guard = self.0.lock().unwrap(); + if let Some(inner) = guard.as_mut() { + if !inner.paused { + return; + } + + inner.paused = false; + inner.lock_sibling(|sib| { + sib.set_paused(false); + }); + let be = inner.backend.clone(); + drop(guard); + be.attachment().notify(); + } + } + + /// Detach from the associated (if any) backend. + pub fn detach(&self) -> Option<()> { + AttachInner::detach(&self.0) + } +} + +static NEXT_DEVICE_ID: AtomicU64 = AtomicU64::new(1); + +/// Tracking structure for outstanding block [`Request`]s. +/// +/// As requests are emitted to the associated backend, the corresponding data +/// required to indicate a completion to the guest will be stored here. The +/// Request is tagged with a unique [`ReqId`] which is used to retrieve said +/// completion data, as well as track the time used to process the request. +/// +/// Although use of [`Tracking`] is not required by the block abstraction, it is +/// here where the general USDT probes are attached. A device which eschews its +/// use will be missing calls into those probes. +pub struct Tracking { + inner: Mutex>, + wait: Arc>, +} +struct TrackingInner { + device_id: u64, + next_id: ReqId, + dev: Weak, + outstanding: BTreeMap>, +} +struct TrackingEntry { + op: Operation, + payload: T, + /// When this request was submitted to the backend to be processed + time_submitted: Instant, +} + +impl Tracking { + pub fn new(dev: Weak) -> Self { + let device_id = NEXT_DEVICE_ID.fetch_add(1, Ordering::Relaxed); + Self { + inner: Mutex::new(TrackingInner { + device_id, + next_id: ReqId::START, + dev, + outstanding: BTreeMap::new(), + }), + wait: Arc::new(Mutex::new(TrackingWait::new())), + } + } + + /// Record tracking in an [`Request`] prior to passing it to the associated + /// [`Backend`]. The request will be assigned a unique [`ReqId`] which can + /// be used to a later call to [`Tracking::complete()`] to retrieve the + /// `payload` data required to communicate its completion. + /// + pub fn track(&self, mut req: Request, payload: T) -> Request { + let now = Instant::now(); + let mut guard = self.inner.lock().unwrap(); + let began_empty = guard.outstanding.is_empty(); + let id = guard.next_id; + guard.next_id.advance(); + let marker = TrackingMarker { + id, + dev: guard.dev.upgrade().expect("device still exists"), + }; + guard.outstanding.insert( + marker.id, + TrackingEntry { op: req.op, payload, time_submitted: now }, + ); + + let old = req.marker.replace(marker); + assert!(old.is_none(), "request should be tracked only once"); + + if began_empty { + self.wait.lock().unwrap().clear_empty() + } + let devid = guard.device_id; + match req.op { + Operation::Read(off, len) => { + probes::block_begin_read!(|| { + (devid, id, off as u64, len as u64) + }); + } + Operation::Write(off, len) => { + probes::block_begin_write!(|| { + (devid, id, off as u64, len as u64) + }); + } + Operation::Flush => { + probes::block_begin_flush!(|| { (devid, id) }); + } + } + + req + } + + /// Indicate the completion of a pending [`Request`], retrieving the + /// associated payload data. The [`block::Result`] argument is used to + /// communicate the result through the generic block USDT probe. + pub fn complete(&self, id: ReqId, res: block::Result) -> (Operation, T) { + let now = Instant::now(); + let mut guard = self.inner.lock().unwrap(); + let entry = guard + .outstanding + .remove(&id) + .expect("tracked request should be present"); + + let devid = guard.device_id; + let proc_ns = + now.duration_since(entry.time_submitted).as_nanos() as u64; + // TODO: calculate queued time + let queue_ns = 0; + let rescode = res as u8; + match entry.op { + Operation::Read(..) => { + probes::block_complete_read!(|| { + (devid, id, rescode, proc_ns, queue_ns) + }); + } + Operation::Write(..) => { + probes::block_complete_write!(|| { + (devid, id, rescode, proc_ns, queue_ns) + }); + } + Operation::Flush => { + probes::block_complete_flush!(|| { + (devid, id, rescode, proc_ns, queue_ns) + }); + } + } + + if guard.outstanding.is_empty() { + self.wait.lock().unwrap().set_empty(); + } + + (entry.op, entry.payload) + } + + /// Query if there are any tracked requests outstanding + pub fn any_outstanding(&self) -> bool { + let guard = self.inner.lock().unwrap(); + !guard.outstanding.is_empty() + } + + /// Emit a [`Future`] which will resolve when there are no tracked + /// request outstanding in this structure. + pub fn none_outstanding(&self) -> NoneOutstanding { + NoneOutstanding::new(self.wait.clone()) + } +} + +/// Record keeping for [`NoneOutstanding`] futures emitted by [`Tracking`] +struct TrackingWait { + empty: bool, + gen: usize, + wake: Vec, +} +impl TrackingWait { + fn new() -> Self { + Self { empty: true, gen: 1, wake: Vec::new() } + } + fn set_empty(&mut self) { + self.empty = true; + self.gen += 1; + + for waker in self.wake.drain(..) { + waker.wake() + } + } + fn clear_empty(&mut self) { + self.empty = false; + } +} + +/// Future which will complete when the referenced [`Tracking`] has no more +/// requests outstanding. +pub struct NoneOutstanding { + wait: Arc>, + gen: usize, +} +impl NoneOutstanding { + fn new(wait: Arc>) -> Self { + Self { wait, gen: 0 } + } +} +impl Future for NoneOutstanding { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { + let mut wguard = self.wait.lock().unwrap(); + if wguard.empty { + Poll::Ready(()) + } else { + // Add us to the waker list if the TrackingWait generation is newer + // than this future. That list is cleaned up by draining its + // entries when waking them (forcing interested futures to re-add + // themselves if need be). + // + // NoneOutstanding instances which are dropped before this occurs + // will "leak" their entry in waker list insofar as it will not be + // cleaned up until the next time the Tracking structure becomes + // empty. This is considered an acceptable trade-off for + // simplicity. + if wguard.gen > self.gen { + wguard.wake.push(cx.waker().clone()); + let gen = wguard.gen; + drop(wguard); + self.gen = gen; + } + Poll::Pending + } + } +} + +pub(super) struct TrackingMarker { + id: ReqId, + dev: Arc, +} +impl TrackingMarker { + pub(super) fn complete(self, res: block::Result) { + self.dev.complete(res, self.id); + } +} diff --git a/lib/propolis/src/block/file.rs b/lib/propolis/src/block/file.rs index c4a9fc4f4..ce5772e39 100644 --- a/lib/propolis/src/block/file.rs +++ b/lib/propolis/src/block/file.rs @@ -7,25 +7,86 @@ use std::io::{Error, ErrorKind, Result}; use std::num::NonZeroUsize; use std::os::unix::io::AsRawFd; use std::path::Path; -use std::sync::{Arc, Weak}; +use std::sync::Arc; -use super::DeviceInfo; -use crate::block; +use crate::accessors::MemAccessor; +use crate::block::{self, DeviceInfo}; use crate::inventory::Entity; use crate::vmm::{MappingExt, MemCtx}; // XXX: completely arb for now const MAX_WORKERS: usize = 32; -/// Standard [`Backend`](super::Backend) implementation. pub struct FileBackend { - fp: Arc, - driver: block::Driver, - log: slog::Logger, + state: Arc, + + worker_count: NonZeroUsize, +} +struct WorkerState { + attachment: block::backend::Attachment, + fp: File, info: block::DeviceInfo, skip_flush: bool, } +impl WorkerState { + fn processing_loop(&self, acc_mem: MemAccessor) { + while let Some(req) = self.attachment.block_for_req() { + if self.info.read_only && req.oper().is_write() { + req.complete(block::Result::ReadOnly); + continue; + } + + let mem = match acc_mem.access() { + Some(m) => m, + None => { + req.complete(block::Result::Failure); + continue; + } + }; + let res = match self.process_request(&req, &mem) { + Ok(_) => block::Result::Success, + Err(_) => block::Result::Failure, + }; + req.complete(res); + } + } + + fn process_request( + &self, + req: &block::Request, + mem: &MemCtx, + ) -> std::result::Result<(), &'static str> { + match req.oper() { + block::Operation::Read(off, len) => { + let maps = req.mappings(mem).ok_or("mapping unavailable")?; + + let nbytes = maps + .preadv(self.fp.as_raw_fd(), off as i64) + .map_err(|_| "io error")?; + if nbytes != len { + return Err("bad read length"); + } + } + block::Operation::Write(off, len) => { + let maps = req.mappings(mem).ok_or("bad guest region")?; + + let nbytes = maps + .pwritev(self.fp.as_raw_fd(), off as i64) + .map_err(|_| "io error")?; + if nbytes != len { + return Err("bad write length"); + } + } + block::Operation::Flush => { + if !self.skip_flush { + self.fp.sync_data().map_err(|_| "io error")?; + } + } + } + Ok(()) + } +} impl FileBackend { /// Creates a new block device from a device at `path`. @@ -33,7 +94,6 @@ impl FileBackend { path: impl AsRef, opts: block::BackendOpts, worker_count: NonZeroUsize, - log: slog::Logger, ) -> Result> { if worker_count.get() > MAX_WORKERS { return Err(Error::new( @@ -58,89 +118,47 @@ impl FileBackend { // TODO: attempt to query blocksize from underlying file/zvol let block_size = opts.block_size.unwrap_or(block::DEFAULT_BLOCK_SIZE); - Ok(Arc::new_cyclic(|me| Self { - fp: Arc::new(fp), - driver: block::Driver::new( - me.clone() as Weak, - "file-bdev".to_string(), - worker_count, - ), - log, - - skip_flush: opts.skip_flush.unwrap_or(false), - info: block::DeviceInfo { - block_size, - total_size: len / block_size as u64, - read_only, - }, - })) - } - fn process_request( - &self, - req: &block::Request, - mem: &MemCtx, - ) -> Result<()> { - match req.oper() { - block::Operation::Read(off) => { - let maps = req.mappings(mem).ok_or_else(|| { - Error::new(ErrorKind::Other, "bad guest region") - })?; + Ok(Arc::new(Self { + state: Arc::new(WorkerState { + attachment: block::backend::Attachment::new(), - let nbytes = maps.preadv(self.fp.as_raw_fd(), off as i64)?; - if nbytes != req.len() { - return Err(Error::new( - ErrorKind::Other, - "bad read length", - )); - } - } - block::Operation::Write(off) => { - if self.info.read_only { - return Err(Error::new( - ErrorKind::PermissionDenied, - "backend is read-only", - )); - } + fp, - let maps = req.mappings(mem).ok_or_else(|| { - Error::new(ErrorKind::Other, "bad guest region") + skip_flush: opts.skip_flush.unwrap_or(false), + info: block::DeviceInfo { + block_size, + total_size: len / block_size as u64, + read_only, + }, + }), + worker_count, + })) + } + fn spawn_workers(&self) -> std::io::Result<()> { + for n in 0..self.worker_count.get() { + let worker_state = self.state.clone(); + let worker_acc = self.state.attachment.accessor_mem(|mem| { + mem.expect("backend is attached") + .child(Some(format!("worker {n}"))) + }); + + let _join = std::thread::Builder::new() + .name(format!("file worker {n}")) + .spawn(move || { + worker_state.processing_loop(worker_acc); })?; - - let nbytes = maps.pwritev(self.fp.as_raw_fd(), off as i64)?; - if nbytes != req.len() { - return Err(Error::new( - ErrorKind::Other, - "bad write length", - )); - } - } - block::Operation::Flush => { - if !self.skip_flush { - self.fp.sync_data()?; - } - } } Ok(()) } } impl block::Backend for FileBackend { - fn info(&self) -> DeviceInfo { - self.info + fn attachment(&self) -> &block::backend::Attachment { + &self.state.attachment } - fn attach(&self, dev: Arc) -> Result<()> { - self.driver.attach(dev) - } - - fn process(&self, req: &block::Request, mem: &MemCtx) -> block::Result { - match self.process_request(req, mem) { - Ok(_) => block::Result::Success, - Err(e) => { - slog::info!(self.log, "block IO error {:?}", req.op; "error" => e); - block::Result::Failure - } - } + fn info(&self) -> DeviceInfo { + self.state.info } } impl Entity for FileBackend { @@ -148,16 +166,11 @@ impl Entity for FileBackend { "block-file" } fn start(&self) -> anyhow::Result<()> { - self.driver.start(); + self.spawn_workers()?; + self.state.attachment.start(); Ok(()) } - fn pause(&self) { - self.driver.pause(); - } - fn resume(&self) { - self.driver.resume(); - } fn halt(&self) { - self.driver.halt(); + self.state.attachment.halt(); } } diff --git a/lib/propolis/src/block/in_memory.rs b/lib/propolis/src/block/in_memory.rs index b85f99ef5..ddfd4fc52 100644 --- a/lib/propolis/src/block/in_memory.rs +++ b/lib/propolis/src/block/in_memory.rs @@ -4,54 +4,44 @@ use std::io::{Error, ErrorKind, Result}; use std::num::NonZeroUsize; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex}; +use crate::accessors::MemAccessor; use crate::block; use crate::inventory::Entity; use crate::vmm::{MemCtx, SubMapping}; pub struct InMemoryBackend { - bytes: Mutex>, - driver: block::Driver, + state: Arc, + worker_count: NonZeroUsize, +} +struct WorkingState { + attachment: block::backend::Attachment, + bytes: Mutex>, info: block::DeviceInfo, } +impl WorkingState { + fn processing_loop(&self, acc_mem: MemAccessor) { + while let Some(req) = self.attachment.block_for_req() { + if self.info.read_only && req.oper().is_write() { + req.complete(block::Result::ReadOnly); + continue; + } -impl InMemoryBackend { - pub fn create( - bytes: Vec, - opts: block::BackendOpts, - ) -> Result> { - let block_size = opts.block_size.unwrap_or(block::DEFAULT_BLOCK_SIZE); - - let len = bytes.len(); - if len == 0 { - return Err(Error::new(ErrorKind::Other, "size cannot be 0")); - } else if (len % block_size as usize) != 0 { - return Err(Error::new( - ErrorKind::Other, - format!( - "size {} not multiple of block size {}!", - len, block_size, - ), - )); + let mem = match acc_mem.access() { + Some(m) => m, + None => { + req.complete(block::Result::Failure); + continue; + } + }; + let res = match self.process_request(&req, &mem) { + Ok(_) => block::Result::Success, + Err(_) => block::Result::Failure, + }; + req.complete(res); } - - Ok(Arc::new_cyclic(|me| Self { - bytes: Mutex::new(bytes), - - driver: block::Driver::new( - me.clone() as Weak, - "mem-bdev".to_string(), - NonZeroUsize::new(1).unwrap(), - ), - - info: block::DeviceInfo { - block_size, - total_size: len as u64 / block_size as u64, - read_only: opts.read_only.unwrap_or(false), - }, - })) } fn process_request( @@ -60,15 +50,15 @@ impl InMemoryBackend { mem: &MemCtx, ) -> Result<()> { match req.oper() { - block::Operation::Read(off) => { + block::Operation::Read(off, len) => { let maps = req.mappings(mem).ok_or_else(|| { Error::new(ErrorKind::Other, "bad guest region") })?; let bytes = self.bytes.lock().unwrap(); - process_read_request(&bytes, off as u64, req.len(), &maps)?; + process_read_request(&bytes, off as u64, len, &maps)?; } - block::Operation::Write(off) => { + block::Operation::Write(off, len) => { if self.info.read_only { return Err(Error::new( ErrorKind::PermissionDenied, @@ -81,12 +71,7 @@ impl InMemoryBackend { })?; let mut bytes = self.bytes.lock().unwrap(); - process_write_request( - &mut bytes, - off as u64, - req.len(), - &maps, - )?; + process_write_request(&mut bytes, off as u64, len, &maps)?; } block::Operation::Flush => { // nothing to do @@ -97,24 +82,64 @@ impl InMemoryBackend { } } -impl block::Backend for InMemoryBackend { - fn info(&self) -> block::DeviceInfo { - self.info - } +impl InMemoryBackend { + pub fn create( + bytes: Vec, + opts: block::BackendOpts, + worker_count: NonZeroUsize, + ) -> Result> { + let block_size = opts.block_size.unwrap_or(block::DEFAULT_BLOCK_SIZE); - fn attach(&self, dev: Arc) -> Result<()> { - self.driver.attach(dev) - } + let len = bytes.len(); + if len == 0 { + return Err(Error::new(ErrorKind::Other, "size cannot be 0")); + } else if (len % block_size as usize) != 0 { + return Err(Error::new( + ErrorKind::Other, + format!( + "size {} not multiple of block size {}!", + len, block_size, + ), + )); + } - fn process(&self, req: &block::Request, mem: &MemCtx) -> block::Result { - match self.process_request(req, mem) { - Ok(_) => block::Result::Success, - Err(_e) => { - // TODO: add detail - //slog::info!(self.log, "block IO error {:?}", req.op; "error" => e); - block::Result::Failure - } + Ok(Arc::new(Self { + state: Arc::new(WorkingState { + attachment: block::backend::Attachment::new(), + bytes: Mutex::new(bytes), + info: block::DeviceInfo { + block_size, + total_size: len as u64 / block_size as u64, + read_only: opts.read_only.unwrap_or(false), + }, + }), + worker_count, + })) + } + fn spawn_workers(&self) -> Result<()> { + for n in 0..self.worker_count.get() { + let worker_state = self.state.clone(); + let worker_acc = self.state.attachment.accessor_mem(|mem| { + mem.expect("backend is attached") + .child(Some(format!("worker {n}"))) + }); + + let _join = std::thread::Builder::new() + .name(format!("in-memory worker {n}")) + .spawn(move || { + worker_state.processing_loop(worker_acc); + })?; } + Ok(()) + } +} + +impl block::Backend for InMemoryBackend { + fn attachment(&self) -> &block::backend::Attachment { + &self.state.attachment + } + fn info(&self) -> block::DeviceInfo { + self.state.info } } @@ -123,17 +148,12 @@ impl Entity for InMemoryBackend { "block-in-memory" } fn start(&self) -> anyhow::Result<()> { - self.driver.start(); + self.state.attachment.start(); + self.spawn_workers()?; Ok(()) } - fn pause(&self) { - self.driver.pause(); - } - fn resume(&self) { - self.driver.resume(); - } fn halt(&self) { - self.driver.halt(); + self.state.attachment.halt(); } } diff --git a/lib/propolis/src/block/mem_async.rs b/lib/propolis/src/block/mem_async.rs index 6615b2d38..35821024f 100644 --- a/lib/propolis/src/block/mem_async.rs +++ b/lib/propolis/src/block/mem_async.rs @@ -7,6 +7,7 @@ use std::num::NonZeroUsize; use std::ptr::NonNull; use std::sync::Arc; +use crate::accessors::MemAccessor; use crate::block; use crate::inventory::Entity; use crate::vmm::MemCtx; @@ -17,13 +18,82 @@ use crate::vmm::MemCtx; /// this backend can be used for measuring how other parts of the emulation /// stack perform. pub struct MemAsyncBackend { - seg: Arc, + work_state: Arc, workers: NonZeroUsize, - scheduler: block::Scheduler, - +} +struct WorkingState { + attachment: block::backend::Attachment, + seg: MmapSeg, info: block::DeviceInfo, } +impl WorkingState { + async fn processing_loop(&self, acc_mem: MemAccessor) { + while let Some(req) = self.attachment.wait_for_req().await { + if self.info.read_only && req.oper().is_write() { + req.complete(block::Result::ReadOnly); + continue; + } + let res = match acc_mem + .access() + .and_then(|mem| self.process_request(&req, &mem).ok()) + { + Some(_) => block::Result::Success, + None => block::Result::Failure, + }; + req.complete(res); + } + } + + fn process_request( + &self, + req: &block::Request, + mem: &MemCtx, + ) -> std::result::Result<(), &'static str> { + let seg = &self.seg; + match req.oper() { + block::Operation::Read(off, _len) => { + let maps = req.mappings(mem).ok_or("bad mapping")?; + + let mut nread = 0; + for map in maps { + unsafe { + let len = map.len(); + let read_ptr = map + .raw_writable() + .ok_or("expected writable mapping")?; + if !seg.read(off + nread, read_ptr, len) { + return Err("failed mem read"); + } + nread += len; + }; + } + } + block::Operation::Write(off, _len) => { + let maps = req.mappings(mem).ok_or("bad mapping")?; + + let mut nwritten = 0; + for map in maps { + unsafe { + let len = map.len(); + let write_ptr = map + .raw_readable() + .ok_or("expected readable mapping")?; + if !seg.write(off + nwritten, write_ptr, len) { + return Err("failed mem write"); + } + nwritten += len; + }; + } + } + block::Operation::Flush => { + // nothing to do + } + } + + Ok(()) + } +} impl MemAsyncBackend { pub fn create( @@ -48,69 +118,34 @@ impl MemAsyncBackend { let seg = MmapSeg::new(size as usize)?; Ok(Arc::new(Self { - seg: Arc::new(seg), + work_state: Arc::new(WorkingState { + attachment: block::backend::Attachment::new(), + info: block::DeviceInfo { + block_size, + total_size: size / block_size as u64, + read_only: opts.read_only.unwrap_or(false), + }, + seg, + }), workers, - scheduler: block::Scheduler::new(), - - info: block::DeviceInfo { - block_size, - total_size: size / block_size as u64, - read_only: opts.read_only.unwrap_or(false), - }, })) } -} -fn process_request( - seg: &MmapSeg, - read_only: bool, - req: &block::Request, - mem: &MemCtx, -) -> std::result::Result<(), &'static str> { - match req.oper() { - block::Operation::Read(off) => { - let maps = req.mappings(mem).ok_or("bad guest region")?; - - let mut nread = 0; - for map in maps { - unsafe { - let len = map.len(); - let read_ptr = - map.raw_writable().ok_or("wrong protection")?; - if !seg.read(off + nread, read_ptr, len) { - return Err("read too long"); - } - nread += len; - }; - } - } - block::Operation::Write(off) => { - if read_only { - return Err("dev is readonly"); - } - - let maps = req.mappings(mem).ok_or("bad guest region")?; - - let mut nwritten = 0; - for map in maps { - unsafe { - let len = map.len(); - let write_ptr = - map.raw_readable().ok_or("wrong protection")?; - if !seg.write(off + nwritten, write_ptr, len) { - return Err("write too long"); - } - nwritten += len; - }; - } - } - block::Operation::Flush => { - // nothing to do + fn spawn_workers(&self) { + for n in 0..self.workers.get() { + let worker_state = self.work_state.clone(); + let worker_acc = + self.work_state.attachment.accessor_mem(|acc_mem| { + acc_mem + .expect("backend is attached") + .child(Some(format!("worker {n}"))) + }); + tokio::spawn(async move { + worker_state.processing_loop(worker_acc).await + }); } } - - Ok(()) } struct MmapSeg(NonNull, usize); @@ -162,37 +197,10 @@ unsafe impl Sync for MmapSeg {} impl block::Backend for MemAsyncBackend { fn info(&self) -> block::DeviceInfo { - self.info - } - - fn attach(&self, dev: Arc) -> Result<()> { - self.scheduler.attach(dev); - - for _n in 0..self.workers.get() { - let seg = self.seg.clone(); - let ro = self.info.read_only; - let mut worker = self.scheduler.worker(); - tokio::spawn(async move { - loop { - let res = match worker.next().await { - None => break, - Some((req, mguard)) => { - match process_request(&seg, ro, req, &mguard) { - Ok(_) => block::Result::Success, - Err(_) => block::Result::Failure, - } - } - }; - worker.complete(res); - } - }); - } - - Ok(()) + self.work_state.info } - - fn process(&self, _req: &block::Request, _mem: &MemCtx) -> block::Result { - panic!("request dispatch expected to be done through async logic"); + fn attachment(&self) -> &block::backend::Attachment { + &self.work_state.attachment } } @@ -201,16 +209,11 @@ impl Entity for MemAsyncBackend { "block-memory-async" } fn start(&self) -> anyhow::Result<()> { - self.scheduler.start(); + self.work_state.attachment.start(); + self.spawn_workers(); Ok(()) } - fn pause(&self) { - self.scheduler.pause(); - } - fn resume(&self) { - self.scheduler.resume(); - } fn halt(&self) { - self.scheduler.halt(); + self.work_state.attachment.halt(); } } diff --git a/lib/propolis/src/block/mod.rs b/lib/propolis/src/block/mod.rs index 043e5a955..eebf5b1db 100644 --- a/lib/propolis/src/block/mod.rs +++ b/lib/propolis/src/block/mod.rs @@ -4,20 +4,13 @@ //! Implements an interface to virtualized block devices. -use std::any::Any; -use std::collections::VecDeque; -use std::num::NonZeroUsize; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Condvar, Mutex, OnceLock, Weak}; +use std::borrow::Borrow; +use std::sync::Arc; -use crate::accessors::{Guard as AccessorGuard, MemAccessor}; +use crate::accessors::MemAccessor; use crate::common::*; -use crate::tasks::*; use crate::vmm::{MemCtx, SubMapping}; -use futures::future::BoxFuture; -use tokio::sync::{Mutex as TokioMutex, Notify, Semaphore}; - mod file; pub use file::FileBackend; @@ -32,6 +25,9 @@ pub use in_memory::InMemoryBackend; mod mem_async; pub use mem_async::MemAsyncBackend; +pub mod backend; +pub mod device; + pub type ByteOffset = usize; pub type ByteLen = usize; @@ -39,72 +35,113 @@ pub type ByteLen = usize; /// is not choosing a block size, a default of 512B is used. pub const DEFAULT_BLOCK_SIZE: u32 = 512; +#[usdt::provider(provider = "propolis")] +mod probes { + fn block_begin_read(dev_id: u64, req_id: u64, offset: u64, len: u64) {} + fn block_begin_write(dev_id: u64, req_id: u64, offset: u64, len: u64) {} + fn block_begin_flush(dev_id: u64, req_id: u64) {} + + fn block_complete_read( + dev_id: u64, + req_id: u64, + result: u8, + proc_ns: u64, + queue_ns: u64, + ) { + } + fn block_complete_write( + dev_id: u64, + req_id: u64, + result: u8, + proc_ns: u64, + queue_ns: u64, + ) { + } + fn block_complete_flush( + dev_id: u64, + req_id: u64, + result: u8, + proc_ns: u64, + queue_ns: u64, + ) { + } +} + /// Type of operations which may be issued to a virtual block device. #[derive(Copy, Clone, Debug, PartialEq)] pub enum Operation { - /// Read from offset - Read(ByteOffset), - /// Write to offset - Write(ByteOffset), + /// Read from `offset` for `len` + Read(ByteOffset, ByteLen), + /// Write to `offset` for len + Write(ByteOffset, ByteLen), /// Flush buffer(s) Flush, } +impl Operation { + pub const fn is_read(&self) -> bool { + matches!(self, Operation::Read(..)) + } + pub const fn is_write(&self) -> bool { + matches!(self, Operation::Write(..)) + } + pub const fn is_flush(&self) -> bool { + matches!(self, Operation::Flush) + } +} +/// Result of a block [`Request`] #[derive(Copy, Clone, Debug)] pub enum Result { + /// Request succeeded Success = 0, + /// Backend indicated failure for operation Failure, + /// Underlying backend is read-only + ReadOnly, + /// Operation not supported by backend Unsupported, } - -pub type BlockPayload = dyn Any + Send + Sync + 'static; +impl Result { + pub const fn is_err(&self) -> bool { + !matches!(self, Result::Success) + } +} /// Block device operation request pub struct Request { /// The type of operation requested by the block device op: Operation, - /// A list of regions of guest memory to read/write into as part of the I/O request + /// A list of regions of guest memory to read/write into as part of the I/O + /// request regions: Vec, - /// Block device specific completion payload for this I/O request - payload: Option>, - - /// Book-keeping for tracking outstanding requests from the block device - /// - /// The `Request::new_*` methods, called by the block device, explicitly initialize - /// this to `None`. Upon getting the request from the block device and before we - /// submit it to the backend, we update this to point to the correct shared reference. - /// See [`Request::track_outstanding`]. - outstanding: Option>, + /// Store [`device::TrackingMarker`] when this request is tracked by a + /// [`device::Tracking`] for that device. It is through this marker that + /// the result of the block request is communicated back to the device + /// emulation for processing. + marker: Option, } impl Request { pub fn new_read( - off: usize, + off: ByteOffset, + len: ByteLen, regions: Vec, - payload: Box, ) -> Self { - let op = Operation::Read(off); - Self { op, regions, payload: Some(payload), outstanding: None } + Self { op: Operation::Read(off, len), regions, marker: None } } pub fn new_write( - off: usize, + off: ByteOffset, + len: ByteLen, regions: Vec, - payload: Box, ) -> Self { - let op = Operation::Write(off); - Self { op, regions, payload: Some(payload), outstanding: None } + Self { op: Operation::Write(off, len), regions, marker: None } } - pub fn new_flush(payload: Box) -> Self { + pub fn new_flush() -> Self { let op = Operation::Flush; - Self { - op, - regions: Vec::new(), - payload: Some(payload), - outstanding: None, - } + Self { op, regions: Vec::new(), marker: None } } /// Type of operation being issued. @@ -119,54 +156,33 @@ impl Request { pub fn mappings<'a>(&self, mem: &'a MemCtx) -> Option>> { match &self.op { - Operation::Read(_) => { + Operation::Read(..) => { self.regions.iter().map(|r| mem.writable_region(r)).collect() } - Operation::Write(_) => { + Operation::Write(..) => { self.regions.iter().map(|r| mem.readable_region(r)).collect() } Operation::Flush => None, } } - /// Total length of operation - pub fn len(&self) -> usize { - match &self.op { - Operation::Read(_) | Operation::Write(_) => { - self.regions.iter().map(|r| r.1).sum() - } - Operation::Flush => 0, + /// Indicate disposition of completed request + pub fn complete(mut self, res: Result) { + if let Some(marker) = self.marker.take() { + marker.complete(res); } } - - /// Indiciate disposition of completed request - pub fn complete(mut self, res: Result, dev: &dyn Device) { - let payload = self.payload.take().unwrap(); - dev.complete(self.op, res, payload); - - // Update the outstanding I/O count - self.outstanding - .take() - .expect("missing OutstandingReqs ref") - .decrement(); - } - - /// Update this request to plug into the outstanding I/O requests for a block device & backend. - fn track_outstanding(&mut self, outstanding: Arc) { - let old = std::mem::replace(&mut self.outstanding, Some(outstanding)); - assert!(old.is_none(), "request already tracked"); - } } impl Drop for Request { fn drop(&mut self) { - if self.payload.is_some() { + if self.marker.is_some() { panic!("request dropped prior to completion"); } } } /// Metadata regarding a virtualized block device. -#[derive(Debug, Copy, Clone)] +#[derive(Default, Debug, Copy, Clone)] pub struct DeviceInfo { /// Size (in bytes) per block pub block_size: u32, @@ -196,738 +212,76 @@ pub struct BackendOpts { /// API to access a virtualized block device. pub trait Device: Send + Sync + 'static { - /// Retreive the next request (if any) + fn attachment(&self) -> &device::Attachment; + + /// Retrieve the next request (if any) fn next(&self) -> Option; /// Complete processing of result - fn complete(&self, op: Operation, res: Result, payload: Box); + fn complete(&self, res: Result, id: ReqId); /// Get an accessor to guest memory via the underlying device fn accessor_mem(&self) -> MemAccessor; - fn set_notifier(&self, f: Option>); + /// Optional on-attach handler to update device state with new `DeviceInfo` + fn attach(&self, _info: DeviceInfo) {} } pub trait Backend: Send + Sync + 'static { - fn attach(&self, dev: Arc) -> std::io::Result<()>; - fn info(&self) -> DeviceInfo; - /// Process a block device request. It is expected that the `Backend` - /// itself will call this through some queuing driver apparatus, rather than - /// the block device emulation itself. - fn process(&self, req: &Request, mem: &MemCtx) -> Result; -} - -pub type NotifierFn = dyn Fn(&dyn Device) + Send + Sync + 'static; - -/// Helper type for keeping track of outstanding I/O requests received -/// from the block device and given to the block backend. -struct OutstandingReqs { - /// Count of how many outstanding I/O requests there currently are - count: AtomicU64, - - /// Notifier to indicate all outstanding requests have been completed - notifier: Mutex>>, -} - -impl OutstandingReqs { - fn new() -> Self { - OutstandingReqs { count: AtomicU64::new(0), notifier: Mutex::default() } - } - - /// Increment the outstanding I/O count and update the `Request` so that it can - /// decrement the count once it has been completed. - fn increment_and_track(self: &Arc, req: &mut Request) { - // Update outstanding count and update `Request` to track it - self.count.fetch_add(1, Ordering::Relaxed); - req.track_outstanding(Arc::clone(self)); - } - - /// Decrement the outstanding I/O count. - /// - /// If a notifier was since attached and we hit 0 outstanding requests as part - /// of decrementing, then we also make sure to trigger the notifier indicating - /// all currently outstanding requests have been completed. - fn decrement(&self) { - if self.count.fetch_sub(1, Ordering::Release) == 1 { - std::sync::atomic::fence(Ordering::Acquire); - if let Some(notifier) = &*self.notifier.lock().unwrap() { - notifier.notify_one(); - } - } - } - - /// Create a new `Notify` object available to the next [`OutstandingReqs::decrement`] call. - /// - /// We create this explicitly not in `new` so that hitting 0 outstanding requests - /// during the normal course of operation doesn't store a permit in the Notify and - /// cause our later `notified()` future to complete too early. - fn create_notifier(&self) { - let mut notifier = self.notifier.lock().unwrap(); - assert!( - notifier.is_none(), - "outstanding requests notifier already exists" - ); - - let notify = Arc::new(Notify::new()); - *notifier = Some(Arc::clone(¬ify)); - - if self.count.load(Ordering::Acquire) == 0 { - // If we hit 0 outstanding requests right before we created the notifier - // above, then we need to make sure there's a permit available. - notify.notify_one(); - } - } - - /// Removes the `Notify` object associated with this counter. - /// - /// # Panics - /// - /// This routine panics if no notification was set or if there were - /// outstanding notifications on this counter. See [`Notifier::resume`]. - fn remove_notifier(&self) { - let mut notifier = self.notifier.lock().unwrap(); - assert!(notifier.is_some(), "notifier must exist to be removed"); - - // Users of the counter create a notifier when pausing a backend, wait - // for its outstanding count to hit 0, and then resume it only - // afterward. The pause-and-wait logic has barriers sufficient to ensure - // that the count has observably reached 0 before any thread will - // observe that the no-requests condition is fulfilled. - assert_eq!(self.count.load(Ordering::Relaxed), 0); - notifier.take(); - } - - /// Returns a future indicating when all outstanding requests have been completed. - fn all_completed(&self) -> Option> { - match &*self.notifier.lock().unwrap() { - Some(notify) => { - let notify = Arc::clone(notify); - Some(Box::pin(async move { notify.notified().await })) - } - None => None, - } - } -} - -/// Notifier help by every block device used to indicate -/// to the corresponding block backends about I/O requests. -pub struct Notifier { - /// Flag used to coalesce request notifications from the block device. - /// - /// If set, we've previously been notified but the backend has yet to - /// respond to the latest notification. - armed: AtomicBool, - - /// The backend specific notification handler - notifier: Mutex>>, - - /// Whether or not we should service I/O requests - paused: AtomicBool, - - /// Book-keeping for tracking outstanding requests from the block device - outstanding: Arc, -} - -impl Notifier { - pub fn new() -> Self { - Self { - armed: AtomicBool::new(false), - notifier: Mutex::new(None), - paused: AtomicBool::new(false), - outstanding: Arc::new(OutstandingReqs::new()), - } - } - pub fn next_arming( - &self, - nextf: impl Fn() -> Option, - ) -> Option { - if self.paused.load(Ordering::Acquire) { - return None; - } - - self.armed.store(false, Ordering::Release); - if let Some(mut req) = nextf() { - // Update outstanding count and update `Request` to track it - self.outstanding.increment_and_track(&mut req); - - // Since a Request was successfully retrieved, no need to rearm the - // notification trigger, just return the Request to the backend - return Some(req); - } - - // On the off chance that the underlying resource became available after - // rearming the notification trigger, check again. - self.armed.store(true, Ordering::Release); - if let Some(mut req) = nextf() { - // Update outstanding count and update `Request` to track it - self.outstanding.increment_and_track(&mut req); - - self.armed.store(false, Ordering::Release); - Some(req) - } else { - None - } - } - pub fn notify(&self, dev: &dyn Device) { - if self.armed.load(Ordering::Acquire) { - let inner = self.notifier.lock().unwrap(); - if let Some(func) = inner.as_ref() { - func(dev); - } - } - } - pub fn set(&self, val: Option>) { - let mut inner = self.notifier.lock().unwrap(); - *inner = val; - } - - /// Stop accepting requests from the block device. - /// - /// Given there might be in-flight requests being handled by the backend, - /// the `Notifier::paused` method returns a future indicating - /// when the pause operation is complete. - pub fn pause(&self) { - // Stop responding to any requests - let paused = self.paused.swap(true, Ordering::Release); - - // Should not be attempting to pause while already paused - assert!(!paused); - - // Hook up the notifier so that we know when we hit 0 outstanding - // requeusts from this point on - self.outstanding.create_notifier(); - } - - /// Returns a future indicating when all outstanding requests have been completed. - pub fn paused(&self) -> BoxFuture<'static, ()> { - assert!(self.paused.load(Ordering::Relaxed)); + fn attachment(&self) -> &backend::Attachment; - self.outstanding - .all_completed() - .expect("missing outstanding requests notifier") - } - - /// Resume accepting requests from the block device. - /// - /// # Panics - /// - /// This routine assumes that on entry, this notifier was previously asked - /// to pause and that the pause requestor waited for the notifier to pause - /// fully before resuming. To that end, this routine asserts that - /// - /// - this object has an active notification channel, and - /// - this object is not tracking any outstanding requests. - pub fn resume(&self) { - self.outstanding.remove_notifier(); - let paused = self.paused.swap(false, Ordering::Release); - assert!(paused); - } + fn info(&self) -> DeviceInfo; } -/// Driver used to service requests from a block device with a specific backend. -pub struct Driver { - inner: Arc, - outer: Weak, - name: String, +pub enum CacheMode { + Synchronous, + WriteBack, } -impl Driver { - /// Create new `BackendDriver` to service requests for the given block device. - pub fn new( - outer: Weak, - name: String, - worker_count: NonZeroUsize, - ) -> Self { - Self { - inner: Arc::new(DriverInner { - bdev: OnceLock::new(), - acc_mem: OnceLock::new(), - queue: Mutex::new(VecDeque::new()), - cv: Condvar::new(), - idle_threads: Semaphore::new(0), - wake: Arc::new(Notify::new()), - task_ctrl: Mutex::new(DriverCtrls::new(worker_count)), - }), - outer, - name, - } - } - - /// Attach driver to emulated device and spawn worker tasks for processing requests. - pub fn attach(&self, bdev: Arc) -> std::io::Result<()> { - let be = self.outer.upgrade().expect("backend must exist"); - - let _old_bdev = self.inner.bdev.set(bdev.clone()); - assert!(_old_bdev.is_ok(), "driver already attached"); - let _old_acc = self.inner.acc_mem.set(bdev.accessor_mem()); - assert!(_old_acc.is_ok(), "driver already attached"); - - // Wire up notifier to the block device - let wake = self.inner.wake.clone(); - bdev.set_notifier(Some(Box::new(move |_bdev| wake.notify_one()))); - - // Spawn (held) worker tasks - let mut tasks = self.inner.task_ctrl.lock().unwrap(); - for (i, ctrl_slot) in tasks.workers.iter_mut().enumerate() { - let (mut task, ctrl) = TaskHdl::new_held(Some(self.worker_waker())); - - let worker_self = Arc::clone(&self.inner); - let worker_be = be.clone(); - let _join = std::thread::Builder::new() - .name(format!("{} {i}", self.name)) - .spawn(move || { - worker_self.process_requests(&mut task, worker_be); - })?; - let old = ctrl_slot.replace(ctrl); - assert!(old.is_none(), "worker {} task already exists", 1); - } - - // Create async task to get requests from block device and feed the worker threads - let sched_self = Arc::clone(&self.inner); - let (mut task, ctrl) = TaskHdl::new_held(None); - let _join = tokio::spawn(async move { - let _ = sched_self.schedule_requests(&mut task).await; - }); - let old = tasks.sched.replace(ctrl); - assert!(old.is_none(), "sched task already exists"); - - Ok(()) - } - - fn worker_waker(&self) -> Box { - let this = Arc::downgrade(&self.inner); - - Box::new(move || { - if let Some(this) = this.upgrade() { - // Take the queue lock in order to synchronize notification with - // any activity in the processing thread. - let _guard = this.queue.lock().unwrap(); - this.cv.notify_all() - } - }) - } +/// Attach a block backend to a corresponding device +/// +/// # Panics +/// +/// If `backend` or `device` are already attached +pub fn attach(backend: Arc, device: Arc) { + let dev_attach = device.attachment(); + let backend_attach = backend.attachment(); - pub fn start(&self) { - // The driver boots up in a paused state, so the start transition is - // equivalent to resuming. - self.resume(); - } - pub fn resume(&self) { - let mut ctrls = self.inner.task_ctrl.lock().unwrap(); - let _ = ctrls.sched.as_mut().unwrap().run(); - for worker in ctrls.workers.iter_mut() { - let _ = worker.as_mut().unwrap().run(); - } - } - pub fn pause(&self) { - let mut ctrls = self.inner.task_ctrl.lock().unwrap(); - let _ = ctrls.sched.as_mut().unwrap().hold(); - for worker in ctrls.workers.iter_mut() { - let _ = worker.as_mut().unwrap().hold(); - } - } - pub fn halt(&self) { - let mut ctrls = self.inner.task_ctrl.lock().unwrap(); - ctrls.sched.take().unwrap().exit(); - for worker in ctrls.workers.iter_mut() { - worker.take().unwrap().exit(); - } - } -} + let mut devlock = dev_attach.0.lock().unwrap(); + let mut belock = backend_attach.0.state.lock().unwrap(); -struct DriverCtrls { - sched: Option, - workers: Vec>, -} -impl DriverCtrls { - fn new(sz: NonZeroUsize) -> Self { - let mut workers = Vec::with_capacity(sz.get()); - workers.resize_with(sz.get(), Default::default); - Self { sched: None, workers } + if devlock.is_some() { + panic!("device is already attached"); } -} - -struct DriverInner { - /// The block device providing the requests to be serviced - bdev: OnceLock>, - - /// Memory accessor through the underlying device - acc_mem: OnceLock, - - /// Queue of I/O requests from the device ready to be serviced by the backend - queue: Mutex>, - - /// Sync for blocking backend worker threads on requests in the queue - cv: Condvar, - - /// Semaphore to block polling block device unless we have idle backend worker threads - idle_threads: Semaphore, - - /// Notify handle used by block device to proactively inform us of any new requests - wake: Arc, - - /// Task control handles for workers - task_ctrl: Mutex, -} -impl DriverInner { - /// Worker thread's main-loop: looks for requests to service in the queue. - fn process_requests(&self, task: &mut TaskHdl, be: Arc) { - let mut idled = false; - loop { - // Heed task events - match task.pending_event() { - Some(Event::Hold) => { - task.hold(); - continue; - } - Some(Event::Exit) => return, - _ => {} - } - - let bdev = self.bdev.get().expect("attached block device").as_ref(); - let acc_mem = self.acc_mem.get().expect("attached memory accessor"); - // Check if we've received any requests to process - let mut guard = self.queue.lock().unwrap(); - if let Some(req) = guard.pop_front() { - drop(guard); - idled = false; - match acc_mem.access() { - Some(mem) => { - let result = be.process(&req, &mem); - req.complete(result, bdev); - } - None => { - req.complete(Result::Failure, bdev); - } - } - } else { - // Wait until more requests are available - if !idled { - self.idle_threads.add_permits(1); - idled = true; - } - let _guard = self - .cv - .wait_while(guard, |g| { - // Be cognizant of task events in addition to the state - // of the queue. - g.is_empty() && task.pending_event().is_none() - }) - .unwrap(); - } - } + if belock.is_some() { + panic!("backend is already attached"); } - /// Attempt to grab a request from the block device (if one's available) - async fn next_req(&self) -> Request { - let bdev = self.bdev.get().expect("attached block device"); - loop { - if let Some(req) = bdev.next() { - return req; - } + *devlock = Some(device::AttachInner::new(&backend_attach, &backend)); + *belock = Some(backend::AttachState::new(&dev_attach, &device)); - // Don't busy-loop on the device but just wait for it to wake us - // when the next request is available - self.wake.notified().await; - } - } - - /// Scheduling task body: feed worker threads with requests from block device. - async fn schedule_requests(&self, task: &mut TaskHdl) { - loop { - let avail = tokio::select! { - event = task.get_event() => { - match event { - Event::Hold => { - task.wait_held().await; - continue; - } - Event::Exit => { - return; - } - } - }, - avail = self.idle_threads.acquire() => { - // We found an idle thread! - avail.unwrap() - } - }; - - tokio::select! { - _event = task.get_event() => { - // Take another lap if an event arrives while we are waiting - // for a request to schedule to the worker - continue; - }, - req = self.next_req() => { - // With a request in hand, we can discard the permit for the - // idle thread which is about to pick up the work. - avail.forget(); - - // Put the request on the queue for processing - let mut queue = self.queue.lock().unwrap(); - queue.push_back(req); - drop(queue); - self.cv.notify_one(); - } - } - } - } + // notify device that it has become attached + let binfo = backend.info(); + device.attach(binfo); } -/// Scheduler for block backends which process requests asynchronously, rather -/// than in worker threads. -pub struct Scheduler(Arc); -impl Scheduler { - pub fn new() -> Self { - Self(Inner::new()) - } - pub fn attach(&self, bdev: Arc) { - self.0.attach(bdev); - } - pub fn worker(&self) -> WorkerHdl { - self.0.worker() - } +/// Unique ID assigned (by [`device::Tracking`] to a given block [`Request`]. +#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] +pub struct ReqId(u64); +impl ReqId { + const START: Self = ReqId(1); - pub fn start(&self) { - self.0.start(); - } - pub fn pause(&self) { - self.0.pause(); - } - pub fn resume(&self) { - self.0.resume(); - } - pub fn halt(&self) { - self.0.halt(); + fn advance(&mut self) { + self.0 += 1; } } - -/// Worker (associated with a [Scheduler]) to accept and process [Request]s -/// asynchronously. -pub struct WorkerHdl { - parent: Arc, - active_req: Option, -} -impl WorkerHdl { - fn new(parent: Arc) -> Self { - Self { parent, active_req: None } - } - - /// Fetch the next request from the [Scheduler]. Only one [Request] may be - /// processed per worker at any given time. - /// - /// Returns `None` if the backend is being halted - pub async fn next(&mut self) -> Option<(&Request, AccessorGuard)> { - assert!(self.active_req.is_none(), "only one active request at a time"); - - self.parent.worker_idle.add_permits(1); - match self.parent.worker_activate.acquire().await { - Ok(permit) => permit.forget(), - Err(_) => { - // Tasks were signaled to bail out - return None; - } - } - - // A valid task activation permit means there should be a request - // waiting for us in the queue. - let mut queue = self.parent.queue.lock().await; - let req = queue.pop_front().unwrap(); - drop(queue); - self.active_req = Some(req); - - let acc_mem = self.parent.acc_mem.get().expect("bdev is attached"); - if let Some(guard) = acc_mem.access() { - Some((self.active_req.as_ref().unwrap(), guard)) - } else { - let req = self.active_req.take().unwrap(); - self.parent.complete_req(req, Result::Failure); - None - } - } - - /// After processing a [Request], submit the result to the block frontend, - /// making this worker ready to poll for another request via - /// [WorkerHdl::next]. - pub fn complete(&mut self, res: Result) { - let req = self.active_req.take().expect("request should be active"); - self.parent.complete_req(req, res); +impl Borrow for ReqId { + fn borrow(&self) -> &u64 { + &self.0 } } - -struct Inner { - /// The block device providing the requests to be serviced - bdev: OnceLock>, - - /// Task control for work-scheduling task - sched_ctrl: Mutex>, - - /// Memory accessor through the underlying device - acc_mem: OnceLock, - - /// Notifier used to both respond to the block frontend when it has requests - /// available for processing, as well as internally when waiting for workers - /// to complete pending processing. - wake: Arc, - - /// Queue of I/O requests from the device ready to be serviced by the backend - queue: TokioMutex>, - - /// Semaphore to gate polling of frontend on availability of idle workers - worker_idle: Semaphore, - - /// Semaphore to activate idle worker to pull request from the queue - worker_activate: Semaphore, - - /// Number of requests being actively serviced - active_req_count: AtomicUsize, -} - -impl Inner { - fn new() -> Arc { - Arc::new(Self { - bdev: OnceLock::new(), - acc_mem: OnceLock::new(), - sched_ctrl: Mutex::new(None), - - queue: TokioMutex::new(VecDeque::new()), - - wake: Arc::new(Notify::new()), - worker_idle: Semaphore::new(0), - worker_activate: Semaphore::new(0), - - active_req_count: AtomicUsize::new(0), - }) - } - - fn attach(self: &Arc, bdev: Arc) { - let _old_bdev = self.bdev.set(bdev.clone()); - assert!(_old_bdev.is_ok(), "driver already attached"); - let _old_acc = self.acc_mem.set(bdev.accessor_mem()); - assert!(_old_acc.is_ok(), "driver already attached"); - - let sched_wake = self.wake.clone(); - let (mut thdl, tctrl) = - TaskHdl::new_held(Some(Box::new(move || sched_wake.notify_one()))); - let sched_self = Arc::clone(self); - tokio::spawn(async move { - sched_self.schedule_requests(&mut thdl).await; - }); - let _old_ctrl = self.sched_ctrl.lock().unwrap().replace(tctrl); - assert!(_old_ctrl.is_none(), "driver already attached"); - - let wake_self = self.wake.clone(); - bdev.set_notifier(Some(Box::new(move |_bdev| wake_self.notify_one()))); - } - - fn with_ctrl(&self, f: impl FnOnce(&mut TaskCtrl)) { - f(self.sched_ctrl.lock().unwrap().as_mut().expect("scheduler started")) - } - - fn start(&self) { - self.with_ctrl(|ctrl| { - let _ = ctrl.run(); - }); - } - fn pause(&self) { - self.with_ctrl(|ctrl| { - let _ = ctrl.hold(); - }); - } - fn resume(&self) { - self.with_ctrl(|ctrl| { - let _ = ctrl.run(); - }); - } - fn halt(&self) { - self.with_ctrl(|ctrl| { - let _ = ctrl.exit(); - }); - } - - fn worker(self: &Arc) -> WorkerHdl { - WorkerHdl::new(self.clone()) - } - - /// Attempt to grab a request from the block device (if one's available) - async fn next_req(&self) -> Request { - let bdev = self.bdev.get().expect("attached block device"); - loop { - if let Some(req) = bdev.next() { - return req; - } - - // Don't busy-loop on the device but just wait for it to wake us - // when the next request is available - self.wake.notified().await; - } - } - - fn complete_req(&self, req: Request, res: Result) { - let bdev = self.bdev.get().expect("attached bdev"); - req.complete(res, bdev.as_ref()); - match self.active_req_count.fetch_sub(1, Ordering::AcqRel) { - 0 => panic!("request completion count mismatch"), - 1 => self.wake.notify_one(), - _ => {} - } - } - - /// Scheduling task body: feed worker threads with requests from block device. - async fn schedule_requests(&self, task: &mut TaskHdl) { - loop { - let avail = tokio::select! { - event = task.get_event() => { - match event { - Event::Hold => { - // Wait for all pending requests to complete - let areq = &self.active_req_count; - while areq.load(Ordering::Acquire) != 0 { - self.wake.notified().await; - } - - // ... then enter held state - task.wait_held().await; - continue; - } - Event::Exit => { - // Close the semaphore to indicate to the workers - // that they should bail out too - self.worker_activate.close(); - return; - } - } - }, - avail = self.worker_idle.acquire() => { - // We found an idle thread! - avail.unwrap() - } - }; - - tokio::select! { - _event = task.get_event() => { - // Take another lap if an event arrives while we are waiting - // for a request to schedule to the worker. Since task - // events are level-triggered, we can leave the processing - // to the get_event() call at the top of the loop. - continue; - }, - req = self.next_req() => { - // With a request in hand, we can discard the permit for the - // idle thread which is about to pick up the work. - avail.forget(); - - // Track pending number of requests - self.active_req_count.fetch_add(1, Ordering::AcqRel); - - // Put the request on the queue for processing - let mut queue = self.queue.lock().await; - queue.push_back(req); - drop(queue); - self.worker_activate.add_permits(1); - } - } - } +impl From for u64 { + fn from(value: ReqId) -> Self { + value.0 } } diff --git a/lib/propolis/src/hw/nvme/cmds.rs b/lib/propolis/src/hw/nvme/cmds.rs index d72c9ad26..1a594bf71 100644 --- a/lib/propolis/src/hw/nvme/cmds.rs +++ b/lib/propolis/src/hw/nvme/cmds.rs @@ -848,6 +848,10 @@ impl From for Completion { block::Result::Failure => { Completion::generic_err(bits::STS_DATA_XFER_ERR) } + block::Result::ReadOnly => Completion::specific_err( + bits::StatusCodeType::CmdSpecific, + bits::STS_WRITE_READ_ONLY_RANGE, + ), block::Result::Unsupported => Completion::specific_err( bits::StatusCodeType::CmdSpecific, bits::STS_READ_CONFLICTING_ATTRS, diff --git a/lib/propolis/src/hw/nvme/mod.rs b/lib/propolis/src/hw/nvme/mod.rs index 119d5eced..0ecc1ad38 100644 --- a/lib/propolis/src/hw/nvme/mod.rs +++ b/lib/propolis/src/hw/nvme/mod.rs @@ -4,7 +4,7 @@ use std::convert::TryInto; use std::mem::size_of; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex, MutexGuard, Weak}; use crate::accessors::Guard; use crate::block; @@ -27,7 +27,7 @@ mod queue; mod requests; use bits::*; -use queue::{CompQueue, QueueId, SubQueue}; +use queue::{CompQueue, Permit, QueueId, SubQueue}; #[usdt::provider(provider = "propolis")] mod probes { @@ -166,12 +166,6 @@ struct NvmeCtrl { /// The Identify structure returned for Identify namespace commands ns_ident: IdentifyNamespace, - - /// Underlying Block Device info - binfo: block::DeviceInfo, - - /// Whether or not we should service guest commands - paused: bool, } impl NvmeCtrl { @@ -417,6 +411,18 @@ impl NvmeCtrl { b << (self.ns_ident.lbaf[(self.ns_ident.flbas & 0xF) as usize]).lbads } + fn update_block_info(&mut self, info: block::DeviceInfo) { + let nsze = info.total_size; + self.ns_ident = bits::IdentifyNamespace { + // No thin provisioning so nsze == ncap == nuse + nsze, + ncap: nsze, + nuse: nsze, + ..self.ns_ident + }; + self.ns_ident.lbaf[0].lbads = info.block_size.trailing_zeros() as u8; + } + fn export(&self) -> migrate::NvmeCtrlV1 { let cqs = self.cqs.iter().flatten().map(|cq| cq.export()).collect(); let sqs = self.sqs.iter().flatten().map(|sq| sq.export()).collect(); @@ -477,23 +483,20 @@ pub struct PciNvme { /// NVMe Controller state: Mutex, - /// Underlying Block Device notifier - notifier: block::Notifier, - /// PCI device state pci_state: pci::DeviceState, + block_attach: block::device::Attachment, + + block_tracking: block::device::Tracking, + /// Logger resource log: slog::Logger, } impl PciNvme { /// Create a new pci-nvme device with the given values - pub fn create( - serial_number: String, - binfo: block::DeviceInfo, - log: slog::Logger, - ) -> Arc { + pub fn create(serial_number: String, log: slog::Logger) -> Arc { let builder = pci::Builder::new(pci::Ident { vendor_id: VENDOR_OXIDE, device_id: PROPOLIS_NVME_DEV_ID, @@ -535,31 +538,15 @@ impl PciNvme { ..Default::default() }; - // Initialize the Identify structure returned when the host issues - // an Identify Namespace command. - let nsze = binfo.total_size; - let mut ns_ident = bits::IdentifyNamespace { - // No thin provisioning so nsze == ncap == nuse - nsze, - ncap: nsze, - nuse: nsze, + // The Identify structure (returned by Identify command issued by guest) + // will be further updated when a backend is attached to make the + // underlying device info available. + let ns_ident = bits::IdentifyNamespace { nlbaf: 0, // We only support a single LBA format (1 but 0-based) flbas: 0, // And it is at index 0 in the lbaf array ..Default::default() }; - // Update the block format we support - debug_assert!( - binfo.block_size.is_power_of_two(), - "binfo.block_size must be a power of 2" - ); - debug_assert!( - binfo.block_size >= 512, - "binfo.block_size must be at least 512 bytes" - ); - - ns_ident.lbaf[0].lbads = binfo.block_size.trailing_zeros() as u8; - // Initialize the CAP "register" leaving most values // at their defaults (0): // TO = 0 => 0ms to wait for controller to be ready @@ -599,8 +586,6 @@ impl PciNvme { sqs: Default::default(), ctrl_ident, ns_ident, - binfo, - paused: false, }; let pci_state = builder @@ -611,10 +596,13 @@ impl PciNvme { .add_cap_msix(pci::BarN::BAR4, NVME_MSIX_COUNT) .finish(); - Arc::new(PciNvme { + Arc::new_cyclic(|weak| PciNvme { state: Mutex::new(state), - notifier: block::Notifier::new(), pci_state, + block_attach: block::device::Attachment::new(), + block_tracking: block::device::Tracking::new( + weak.clone() as Weak + ), log, }) } @@ -851,21 +839,16 @@ impl PciNvme { // We may have skipped pulling entries off some SQ due to this // CQ having no available entry slots. Since we've just freed - // up some slots, kick the SQs (excl. admin) here just in case. - // TODO: worth kicking only the SQs specifically associated - // with this CQ? - if !state.paused && cq.kick() { - self.notifier.notify(self); - } + // up some slots, notify any attached block backend that + // there may be new requests available. + self.block_attach.notify(); } else { // Submission Queue y Tail Doorbell let sq = state.get_sq(qid)?; sq.notify_tail(val)?; - // Poke block device to service new requests - if !state.paused { - self.notifier.notify(self); - } + // Poke block backend to service new requests + self.block_attach.notify(); } } } @@ -879,10 +862,6 @@ impl PciNvme { mut state: MutexGuard, sq: Arc, ) -> Result<(), NvmeError> { - if state.paused { - return Ok(()); - } - // Grab the Admin CQ too let cq = state.get_admin_cq()?; @@ -1036,29 +1015,15 @@ impl Entity for PciNvme { } fn pause(&self) { - let mut ctrl = self.state.lock().unwrap(); - - // Stop responding to any requests - assert!(!ctrl.paused); - ctrl.paused = true; - - self.notifier.pause(); + self.block_attach.pause(); } fn resume(&self) { - let mut ctrl = self.state.lock().unwrap(); - - assert!(ctrl.paused); - ctrl.paused = false; - - self.notifier.resume(); + self.block_attach.resume(); } fn paused(&self) -> BoxFuture<'static, ()> { - let ctrl = self.state.lock().unwrap(); - assert!(ctrl.paused); - - Box::pin(self.notifier.paused()) + Box::pin(self.block_tracking.none_outstanding()) } fn migrate(&self) -> Migrator { diff --git a/lib/propolis/src/hw/nvme/requests.rs b/lib/propolis/src/hw/nvme/requests.rs index 51c6007f5..f6733e98d 100644 --- a/lib/propolis/src/hw/nvme/requests.rs +++ b/lib/propolis/src/hw/nvme/requests.rs @@ -4,7 +4,7 @@ use crate::{ accessors::MemAccessor, - block::{self, BlockPayload, Operation, Request, Result as BlockResult}, + block::{self, Operation, Request, Result as BlockResult}, hw::nvme::{bits, cmds::Completion}, }; @@ -31,42 +31,36 @@ mod probes { } } -type NvmeBlockPayload = Permit; - impl block::Device for PciNvme { + fn attachment(&self) -> &block::device::Attachment { + &self.block_attach + } + + fn attach(&self, info: block::DeviceInfo) { + self.state.lock().unwrap().update_block_info(info); + } + fn next(&self) -> Option { - self.notifier.next_arming(|| self.next_req()) + let (req, permit) = self.next_req()?; + Some(self.block_tracking.track(req, permit)) } - fn complete( - &self, - op: Operation, - res: BlockResult, - payload: Box, - ) { - let payload: Box = - payload.downcast().expect("payload must be correct type"); - self.complete_req(op, res, *payload); + fn complete(&self, res: BlockResult, id: block::ReqId) { + let (op, permit) = self.block_tracking.complete(id, res); + self.complete_req(op, res, permit); } fn accessor_mem(&self) -> MemAccessor { self.pci_state.acc_mem.child(Some("block backend".to_string())) } - - fn set_notifier(&self, f: Option>) { - self.notifier.set(f) - } } impl PciNvme { /// Pop an available I/O request off of a Submission Queue to begin /// processing by the underlying Block Device. - fn next_req(&self) -> Option { + fn next_req(&self) -> Option<(Request, Permit)> { let state = self.state.lock().unwrap(); - // We shouldn't be called while paused - assert!(!state.paused, "I/O requested while device paused"); - let mem = self.mem_access()?; // Go through all the queues (skip admin as we just want I/O queues) @@ -87,23 +81,6 @@ impl PciNvme { let cmd = NvmCmd::parse(sub); match cmd { - Ok(NvmCmd::Write(cmd)) if state.binfo.read_only => { - let off = state.nlb_to_size(cmd.slba as usize) as u64; - let size = state.nlb_to_size(cmd.nlb as usize) as u64; - probes::nvme_write_enqueue!(|| ( - qid, idx, cid, off, size - )); - let comp = Completion::specific_err( - bits::StatusCodeType::CmdSpecific, - bits::STS_WRITE_READ_ONLY_RANGE, - ); - probes::nvme_write_complete!(|| ( - qid, - cid, - BlockResult::Failure as u8, - )); - permit.complete(comp, Some(&mem)); - } Ok(NvmCmd::Write(cmd)) => { let off = state.nlb_to_size(cmd.slba as usize) as u64; let size = state.nlb_to_size(cmd.nlb as usize) as u64; @@ -114,10 +91,10 @@ impl PciNvme { let bufs = cmd.data(size, &mem).collect(); let req = Request::new_write( off as usize, + size as usize, bufs, - Box::new(permit), ); - return Some(req); + return Some((req, permit)); } Ok(NvmCmd::Read(cmd)) => { let off = state.nlb_to_size(cmd.slba as usize) as u64; @@ -130,15 +107,15 @@ impl PciNvme { let bufs = cmd.data(size, &mem).collect(); let req = Request::new_read( off as usize, + size as usize, bufs, - Box::new(permit), ); - return Some(req); + return Some((req, permit)); } Ok(NvmCmd::Flush) => { probes::nvme_flush_enqueue!(|| (qid, idx, cid)); - let req = Request::new_flush(Box::new(permit)); - return Some(req); + let req = Request::new_flush(); + return Some((req, permit)); } Ok(NvmCmd::Unknown(_)) | Err(_) => { // For any other unrecognized or malformed command, diff --git a/lib/propolis/src/hw/virtio/block.rs b/lib/propolis/src/hw/virtio/block.rs index d45e389e2..786d65d04 100644 --- a/lib/propolis/src/hw/virtio/block.rs +++ b/lib/propolis/src/hw/virtio/block.rs @@ -3,7 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use std::num::NonZeroU16; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use crate::accessors::MemAccessor; use crate::block; @@ -35,11 +35,11 @@ pub struct PciVirtioBlock { virtio_state: PciVirtioState, pci_state: pci::DeviceState, - info: block::DeviceInfo, - notifier: block::Notifier, + block_attach: block::device::Attachment, + block_tracking: block::device::Tracking, } impl PciVirtioBlock { - pub fn new(queue_size: u16, info: block::DeviceInfo) -> Arc { + pub fn new(queue_size: u16) -> Arc { let queues = VirtQueues::new( NonZeroU16::new(queue_size).unwrap(), NonZeroU16::new(1).unwrap(), @@ -57,12 +57,19 @@ impl PciVirtioBlock { VIRTIO_BLK_CFG_SIZE, ); - let notifier = block::Notifier::new(); - Arc::new(Self { pci_state, virtio_state, info, notifier }) + Arc::new_cyclic(|weak| Self { + pci_state, + virtio_state, + block_attach: block::device::Attachment::new(), + block_tracking: block::device::Tracking::new( + weak.clone() as Weak + ), + }) } fn block_cfg_read(&self, id: &BlockReg, ro: &mut ReadOp) { - let info = self.info; + let info = self.block_attach.info().unwrap_or_else(Default::default); + let total_bytes = info.total_size * info.block_size as u64; match id { BlockReg::Capacity => { @@ -110,10 +117,9 @@ impl PciVirtioBlock { probes::vioblk_read_enqueue!(|| ( rid, off as u64, sz as u64 )); - Ok(block::Request::new_read( - off, - regions, - Box::new(CompletionPayload { rid, chain }), + Ok(self.block_tracking.track( + block::Request::new_read(off, sz, regions), + CompletionPayload { rid, chain }, )) } else { Err(chain) @@ -128,10 +134,9 @@ impl PciVirtioBlock { probes::vioblk_write_enqueue!(|| ( rid, off as u64, sz as u64 )); - Ok(block::Request::new_write( - off, - regions, - Box::new(CompletionPayload { rid, chain }), + Ok(self.block_tracking.track( + block::Request::new_write(off, sz, regions), + CompletionPayload { rid, chain }, )) } else { Err(chain) @@ -139,10 +144,10 @@ impl PciVirtioBlock { } VIRTIO_BLK_T_FLUSH => { probes::vioblk_flush_enqueue!(|| (rid)); - Ok(block::Request::new_flush(Box::new(CompletionPayload { - rid, - chain, - }))) + Ok(self.block_tracking.track( + block::Request::new_flush(), + CompletionPayload { rid, chain }, + )) } _ => Err(chain), }; @@ -173,6 +178,7 @@ impl PciVirtioBlock { let resnum = match res { block::Result::Success => VIRTIO_BLK_S_OK, block::Result::Failure => VIRTIO_BLK_S_IOERR, + block::Result::ReadOnly => VIRTIO_BLK_S_IOERR, block::Result::Unsupported => VIRTIO_BLK_S_UNSUPP, }; match op { @@ -206,7 +212,8 @@ impl VirtioDevice for PciVirtioBlock { feat |= VIRTIO_BLK_F_SEG_MAX; feat |= VIRTIO_BLK_F_FLUSH; - if self.info.read_only { + let info = self.block_attach.info().unwrap_or_else(Default::default); + if info.read_only { feat |= VIRTIO_BLK_F_RO; } feat @@ -216,7 +223,7 @@ impl VirtioDevice for PciVirtioBlock { } fn queue_notify(&self, _vq: &Arc) { - self.notifier.notify(self); + self.block_attach.notify() } } impl PciVirtio for PciVirtioBlock { @@ -228,29 +235,23 @@ impl PciVirtio for PciVirtioBlock { } } impl block::Device for PciVirtioBlock { + fn attachment(&self) -> &block::device::Attachment { + &self.block_attach + } + fn next(&self) -> Option { - self.notifier.next_arming(|| self.next_req()) + self.next_req() } - fn complete( - &self, - op: block::Operation, - res: block::Result, - payload: Box, - ) { - let mut payload: Box = - payload.downcast().expect("payload must be correct type"); - let CompletionPayload { rid, ref mut chain } = *payload; + fn complete(&self, res: block::Result, id: block::ReqId) { + let (op, mut payload) = self.block_tracking.complete(id, res); + let CompletionPayload { rid, ref mut chain } = payload; self.complete_req(rid, op, res, chain); } fn accessor_mem(&self) -> MemAccessor { self.pci_state.acc_mem.child(Some("block backend".to_string())) } - - fn set_notifier(&self, val: Option>) { - self.notifier.set(val); - } } impl Entity for PciVirtioBlock { fn type_name(&self) -> &'static str { @@ -260,13 +261,13 @@ impl Entity for PciVirtioBlock { self.virtio_state.reset(self); } fn pause(&self) { - self.notifier.pause(); + self.block_attach.pause(); } fn resume(&self) { - self.notifier.resume(); + self.block_attach.resume(); } fn paused(&self) -> BoxFuture<'static, ()> { - Box::pin(self.notifier.paused()) + Box::pin(self.block_tracking.none_outstanding()) } fn migrate(&self) -> Migrator { Migrator::Multi(self)