From a6624f1ad53a59ae68bd75c1b45c624d7b95979e Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 12 Nov 2024 11:15:11 -0500 Subject: [PATCH] Remove delay-based backpressure in favor of explicit queue limits (#1515) Right now, we have a backpressure delay that's a function of `(job count, bytes in flight)`. The actual queue length is set implicitly by the shape of that function (e.g. its gain) and the actual downstairs IO time. If a downstairs slows down due to load, then the queue length has to grow to compensate! Having an implicitly-defined queue length is tricky, because it's not well controlled. Queue length also affects latency in subtle ways, so directly controlling the queue length would be helpful. This PR removes the delay-based backpressure implementation in favor of a simple pair of semaphores: there are a certain number of job and byte permits available, and the `Guest` has to acquire them before sending a job to the `Upstairs`. In other words, writes will be accepted as fast as possible _until_ we run out of permits; we will then shift to a one-in, one-out operation mode where jobs have to be completed by the downstairs before a new job is submitted. The maximum queue length (either in jobs or bytes) is well known and set by global constants. Architecturally, we replace the `BackpressureGuard` with a new `IOLimitGuard`, which is claimed by the `Guest` and stored in relevant `BlockOp` variants (then moved into the `DownstairsIO`). It still uses RAII to automatically release permits when dropped, and we still manually drop it early (once a job is complete on a particular downstairs). --- cmon/src/main.rs | 8 - tools/dtrace/single_up_info.d | 3 +- tools/dtrace/sled_upstairs_info.d | 3 +- tools/dtrace/upstairs_info.d | 5 +- upstairs/src/backpressure.rs | 375 ------------------------- upstairs/src/client.rs | 56 ++-- upstairs/src/deferred.rs | 11 +- upstairs/src/downstairs.rs | 163 +++++------ upstairs/src/dummy_downstairs_tests.rs | 3 +- upstairs/src/guest.rs | 137 +++++---- upstairs/src/io_limits.rs | 158 +++++++++++ upstairs/src/lib.rs | 45 ++- upstairs/src/upstairs.rs | 264 ++++++++++++----- 13 files changed, 551 insertions(+), 680 deletions(-) delete mode 100644 upstairs/src/backpressure.rs create mode 100644 upstairs/src/io_limits.rs diff --git a/cmon/src/main.rs b/cmon/src/main.rs index 4f3fe1952..1f3595808 100644 --- a/cmon/src/main.rs +++ b/cmon/src/main.rs @@ -41,7 +41,6 @@ enum DtraceDisplay { Replaced, ExtentLiveRepair, ExtentLimit, - Backpressure, NextJobId, JobDelta, DsDelay, @@ -61,7 +60,6 @@ impl fmt::Display for DtraceDisplay { DtraceDisplay::Replaced => write!(f, "replaced"), DtraceDisplay::ExtentLiveRepair => write!(f, "extent_live_repair"), DtraceDisplay::ExtentLimit => write!(f, "extent_under_repair"), - DtraceDisplay::Backpressure => write!(f, "backpressure"), DtraceDisplay::NextJobId => write!(f, "next_job_id"), DtraceDisplay::JobDelta => write!(f, "job_delta"), DtraceDisplay::DsDelay => write!(f, "ds_delay"), @@ -229,9 +227,6 @@ fn print_dtrace_header(dd: &[DtraceDisplay]) { DtraceDisplay::ExtentLimit => { print!(" {:>4}", "EXTL"); } - DtraceDisplay::Backpressure => { - print!(" {:>5}", "BAKPR"); - } DtraceDisplay::NextJobId => { print!(" {:>7}", "NEXTJOB"); } @@ -348,9 +343,6 @@ fn print_dtrace_row(d_out: Arg, dd: &[DtraceDisplay], last_job_id: &mut u64) { DtraceDisplay::ExtentLimit => { print!(" {:4}", d_out.ds_extent_limit); } - DtraceDisplay::Backpressure => { - print!(" {:>5}", d_out.up_backpressure); - } DtraceDisplay::NextJobId => { print!(" {:>7}", d_out.next_job_id); } diff --git a/tools/dtrace/single_up_info.d b/tools/dtrace/single_up_info.d index 0eb1f4e1b..476edb8eb 100755 --- a/tools/dtrace/single_up_info.d +++ b/tools/dtrace/single_up_info.d @@ -41,7 +41,7 @@ crucible_upstairs*:::up-status * I'm not very happy about this, but if we don't print it all on one * line, then multiple sessions will clobber each others output. */ - printf("%8s %17s %17s %17s %5s %5s %9s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n", + printf("%8s %17s %17s %17s %5s %5s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n", substr(session_id, 0, 8), @@ -62,7 +62,6 @@ crucible_upstairs*:::up-status * Job ID delta and backpressure */ json(copyinstr(arg1), "ok.next_job_id"), - json(copyinstr(arg1), "ok.up_backpressure"), json(copyinstr(arg1), "ok.write_bytes_out"), /* diff --git a/tools/dtrace/sled_upstairs_info.d b/tools/dtrace/sled_upstairs_info.d index 277b8757e..e1ec9b256 100755 --- a/tools/dtrace/sled_upstairs_info.d +++ b/tools/dtrace/sled_upstairs_info.d @@ -46,7 +46,7 @@ crucible_upstairs*:::up-status * we don't print it all on one line, then multiple sessions will * clobber each others output. */ - printf("%5d %8s %17s %17s %17s %5s %5s %9s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n", + printf("%5d %8s %17s %17s %17s %5s %5s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n", pid, substr(session_id, 0, 8), @@ -62,7 +62,6 @@ crucible_upstairs*:::up-status /* Job ID and backpressure */ json(copyinstr(arg1), "ok.next_job_id"), - json(copyinstr(arg1), "ok.up_backpressure"), json(copyinstr(arg1), "ok.write_bytes_out"), /* In progress jobs on the work list for each downstairs */ diff --git a/tools/dtrace/upstairs_info.d b/tools/dtrace/upstairs_info.d index 64294fc2c..5cb4fc178 100755 --- a/tools/dtrace/upstairs_info.d +++ b/tools/dtrace/upstairs_info.d @@ -21,7 +21,7 @@ tick-1s { printf("%6s ", "PID"); printf("%17s %17s %17s", "DS STATE 0", "DS STATE 1", "DS STATE 2"); - printf(" %5s %5s %9s %5s", "UPW", "DSW", "JOBID", "BAKPR"); + printf(" %5s %5s %9s", "UPW", "DSW", "JOBID"); printf(" %10s", "WRITE_BO"); printf(" %5s %5s %5s", "IP0", "IP1", "IP2"); printf(" %5s %5s %5s", "D0", "D1", "D2"); @@ -49,10 +49,9 @@ crucible_upstairs*:::up-status printf(" %5s", json(copyinstr(arg1), "ok.ds_count")); /* - * Job ID and backpressure + * Job ID and outstanding bytes */ printf(" %9s", json(copyinstr(arg1), "ok.next_job_id")); - printf(" %5s", json(copyinstr(arg1), "ok.up_backpressure")); printf(" %10s", json(copyinstr(arg1), "ok.write_bytes_out")); /* diff --git a/upstairs/src/backpressure.rs b/upstairs/src/backpressure.rs deleted file mode 100644 index feaf4a33f..000000000 --- a/upstairs/src/backpressure.rs +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright 2024 Oxide Computer Company - -use crate::{IOop, IO_OUTSTANDING_MAX_BYTES, IO_OUTSTANDING_MAX_JOBS}; -use std::{ - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::Duration, -}; - -/// Helper struct to contain a set of backpressure counters -#[derive(Debug)] -pub struct BackpressureCounters(Arc); - -/// Inner data structure for individual backpressure counters -#[derive(Debug)] -struct BackpressureCountersInner { - /// Number of bytes from `Write` and `WriteUnwritten` operations - /// - /// This value is used for global backpressure, to avoid buffering too many - /// writes (which otherwise return immediately, and are not persistent until - /// a flush) - write_bytes: AtomicU64, - - /// Number of jobs in the queue - /// - /// This value is also used for global backpressure - // XXX should we only count write jobs here? Or should we also count read - // bytes for global backpressure? Much to ponder... - jobs: AtomicU64, - - /// Number of bytes from `Write`, `WriteUnwritten`, and `Read` operations - /// - /// This value is used for local backpressure, to keep the 3x Downstairs - /// roughly in sync. Otherwise, the fastest Downstairs will answer all read - /// requests, and the others can get arbitrarily far behind. - io_bytes: AtomicU64, -} - -/// Guard to automatically decrement backpressure bytes when dropped -#[derive(Debug)] -pub struct BackpressureGuard { - counter: Arc, - write_bytes: u64, - io_bytes: u64, - // There's also an implicit "1 job" here -} - -impl Drop for BackpressureGuard { - fn drop(&mut self) { - self.counter - .write_bytes - .fetch_sub(self.write_bytes, Ordering::Relaxed); - self.counter - .io_bytes - .fetch_sub(self.io_bytes, Ordering::Relaxed); - self.counter.jobs.fetch_sub(1, Ordering::Relaxed); - } -} - -impl BackpressureGuard { - #[cfg(test)] - pub fn dummy() -> Self { - let counter = Arc::new(BackpressureCountersInner { - write_bytes: 0.into(), - io_bytes: 0.into(), - jobs: 1.into(), - }); - Self { - counter, - write_bytes: 0, - io_bytes: 0, - } - } -} - -impl BackpressureCounters { - pub fn new() -> Self { - Self(Arc::new(BackpressureCountersInner { - write_bytes: AtomicU64::new(0), - io_bytes: AtomicU64::new(0), - jobs: AtomicU64::new(0), - })) - } - - pub fn get_write_bytes(&self) -> u64 { - self.0.write_bytes.load(Ordering::Relaxed) - } - - pub fn get_io_bytes(&self) -> u64 { - self.0.io_bytes.load(Ordering::Relaxed) - } - - pub fn get_jobs(&self) -> u64 { - self.0.jobs.load(Ordering::Relaxed) - } - - /// Stores write / IO bytes (and 1 job) for a pending write - #[must_use] - pub fn early_write_increment(&mut self, bytes: u64) -> BackpressureGuard { - self.0.write_bytes.fetch_add(bytes, Ordering::Relaxed); - self.0.io_bytes.fetch_add(bytes, Ordering::Relaxed); - self.0.jobs.fetch_add(1, Ordering::Relaxed); - BackpressureGuard { - counter: self.0.clone(), - write_bytes: bytes, - io_bytes: bytes, - // implicit 1 job - } - } - - /// Stores write / IO bytes (and 1 job) in the backpressure counters - #[must_use] - pub fn increment(&mut self, io: &IOop) -> BackpressureGuard { - let write_bytes = io.write_bytes(); - let io_bytes = io.job_bytes(); - self.0.write_bytes.fetch_add(write_bytes, Ordering::Relaxed); - self.0.io_bytes.fetch_add(io_bytes, Ordering::Relaxed); - self.0.jobs.fetch_add(1, Ordering::Relaxed); - BackpressureGuard { - counter: self.0.clone(), - write_bytes, - io_bytes, - // implicit 1 job - } - } -} - -/// Configuration for host-side backpressure -/// -/// Backpressure adds an artificial delay to host write messages (which are -/// otherwise acked immediately, before actually being complete). The delay is -/// varied based on two metrics: -/// -/// - number of write bytes outstanding -/// - queue length (in jobs) -/// -/// We compute backpressure delay based on both metrics, then pick the larger of -/// the two delays. -#[derive(Copy, Clone, Debug)] -pub struct BackpressureConfig { - pub bytes: BackpressureChannelConfig, - pub queue: BackpressureChannelConfig, -} - -impl Default for BackpressureConfig { - fn default() -> BackpressureConfig { - // `max_value` values below must be higher than `IO_OUTSTANDING_MAX_*`; - // otherwise, replaying jobs to a previously-Offline Downstairs could - // immediately kick us into the saturated regime, which would be - // unfortunate. - BackpressureConfig { - // Byte-based backpressure - bytes: BackpressureChannelConfig { - start_value: 50 * 1024u64.pow(2), // 50 MiB - max_value: IO_OUTSTANDING_MAX_BYTES * 2, - delay_scale: Duration::from_millis(100), - delay_max: Duration::from_millis(30_000), - }, - - // Queue-based backpressure - queue: BackpressureChannelConfig { - start_value: 500, - max_value: IO_OUTSTANDING_MAX_JOBS as u64 * 2, - delay_scale: Duration::from_millis(5), - delay_max: Duration::from_millis(30_000), - }, - } - } -} - -#[derive(Copy, Clone, Debug)] -pub struct BackpressureChannelConfig { - /// When should backpressure start - pub start_value: u64, - /// Value at which backpressure is saturated - pub max_value: u64, - - /// Characteristic scale of backpressure - /// - /// This scale sets the backpressure delay halfway between `start`_value and - /// `max_value` - pub delay_scale: Duration, - - /// Maximum delay (returned at `max_value`) - pub delay_max: Duration, -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum BackpressureAmount { - Duration(Duration), - Saturated, -} - -impl std::cmp::Ord for BackpressureAmount { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - match (self, other) { - (BackpressureAmount::Saturated, BackpressureAmount::Saturated) => { - std::cmp::Ordering::Equal - } - (BackpressureAmount::Saturated, _) => std::cmp::Ordering::Greater, - (_, BackpressureAmount::Saturated) => std::cmp::Ordering::Less, - ( - BackpressureAmount::Duration(a), - BackpressureAmount::Duration(b), - ) => a.cmp(b), - } - } -} - -impl std::cmp::PartialOrd for BackpressureAmount { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl BackpressureAmount { - /// Converts to a delay in microseconds - /// - /// The saturated case is marked by `u64::MAX` - pub fn as_micros(&self) -> u64 { - match self { - BackpressureAmount::Duration(t) => t.as_micros() as u64, - BackpressureAmount::Saturated => u64::MAX, - } - } -} - -/// Helper `struct` to store a shared backpressure amount -/// -/// Under the hood, this stores a value in microseconds in an `AtomicU64`, so it -/// can be read and written atomically. `BackpressureAmount::Saturated` is -/// indicated by `u64::MAX`. -#[derive(Clone, Debug)] -pub struct SharedBackpressureAmount(Arc); - -impl SharedBackpressureAmount { - pub fn new() -> Self { - Self(Arc::new(AtomicU64::new(0))) - } - - pub fn store(&self, b: BackpressureAmount) { - let v = match b { - BackpressureAmount::Duration(d) => d.as_micros() as u64, - BackpressureAmount::Saturated => u64::MAX, - }; - self.0.store(v, Ordering::Relaxed); - } - - pub fn load(&self) -> BackpressureAmount { - match self.0.load(Ordering::Relaxed) { - u64::MAX => BackpressureAmount::Saturated, - delay_us => { - BackpressureAmount::Duration(Duration::from_micros(delay_us)) - } - } - } -} - -impl BackpressureChannelConfig { - fn get_backpressure(&self, value: u64) -> BackpressureAmount { - // Return a special value if we're saturated - if value >= self.max_value { - return BackpressureAmount::Saturated; - } - - // This ratio starts at 0 (at start_value) and hits 1 when backpressure - // should be maxed out. - let frac = value.saturating_sub(self.start_value) as f64 - / (self.max_value - self.start_value) as f64; - - let v = if frac < 0.5 { - frac * 2.0 - } else { - 1.0 / (2.0 * (1.0 - frac)) - }; - - BackpressureAmount::Duration( - self.delay_scale.mul_f64(v.powi(2)).min(self.delay_max), - ) - } -} - -impl BackpressureConfig { - pub fn get_backpressure( - &self, - bytes: u64, - jobs: u64, - ) -> BackpressureAmount { - let bp_bytes = self.bytes.get_backpressure(bytes); - let bp_queue = self.queue.get_backpressure(jobs); - bp_bytes.max(bp_queue) - } -} - -#[cfg(test)] -mod test { - use super::*; - - /// Confirm that replaying the max number of jobs / bytes will not saturate - #[test] - fn check_outstanding_backpressure() { - let cfg = BackpressureConfig::default(); - let BackpressureAmount::Duration(_) = - cfg.get_backpressure(IO_OUTSTANDING_MAX_BYTES, 0) - else { - panic!("backpressure saturated") - }; - - let BackpressureAmount::Duration(_) = - cfg.get_backpressure(0, IO_OUTSTANDING_MAX_JOBS as u64) - else { - panic!("backpressure saturated") - }; - } - - #[test] - fn check_max_backpressure() { - let cfg = BackpressureConfig::default(); - let BackpressureAmount::Duration(timeout) = cfg - .get_backpressure(IO_OUTSTANDING_MAX_BYTES * 2 - 1024u64.pow(2), 0) - else { - panic!("backpressure saturated") - }; - println!( - "max byte-based delay: {}", - humantime::format_duration(timeout) - ); - assert!( - timeout >= Duration::from_secs(30), - "max byte-based backpressure delay is too low; - expected >= 30 secs, got {}", - humantime::format_duration(timeout) - ); - - let BackpressureAmount::Duration(timeout) = - cfg.get_backpressure(0, IO_OUTSTANDING_MAX_JOBS as u64 * 2 - 1) - else { - panic!("backpressure saturated") - }; - println!( - "max job-based delay: {}", - humantime::format_duration(timeout) - ); - assert!( - timeout >= Duration::from_secs(30), - "max job-based backpressure delay is too low; - expected >= 30 secs, got {}", - humantime::format_duration(timeout) - ); - } - - #[test] - fn check_saturated_backpressure() { - let cfg = BackpressureConfig::default(); - assert!(matches!( - cfg.get_backpressure(IO_OUTSTANDING_MAX_BYTES * 2 + 1, 0), - BackpressureAmount::Saturated - )); - assert!(matches!( - cfg.get_backpressure(IO_OUTSTANDING_MAX_BYTES * 2, 0), - BackpressureAmount::Saturated - )); - - assert!(matches!( - cfg.get_backpressure(0, IO_OUTSTANDING_MAX_JOBS as u64 * 2 + 1), - BackpressureAmount::Saturated - )); - assert!(matches!( - cfg.get_backpressure(0, IO_OUTSTANDING_MAX_JOBS as u64 * 2), - BackpressureAmount::Saturated - )); - } -} diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index b5dde9d61..59406787f 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -1,9 +1,9 @@ // Copyright 2023 Oxide Computer Company use crate::{ - backpressure::BackpressureCounters, cdt, integrity_hash, - live_repair::ExtentInfo, upstairs::UpstairsConfig, upstairs::UpstairsState, - ClientIOStateCount, ClientId, CrucibleDecoder, CrucibleError, DownstairsIO, - DsState, EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse, + cdt, integrity_hash, io_limits::ClientIOLimits, live_repair::ExtentInfo, + upstairs::UpstairsConfig, upstairs::UpstairsState, ClientIOStateCount, + ClientId, CrucibleDecoder, CrucibleError, DownstairsIO, DsState, + EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse, ReconcileIO, ReconcileIOState, RegionDefinitionStatus, RegionMetadata, }; use crucible_common::{x509::TLSContext, ExtentId, VerboseTimeout}; @@ -122,11 +122,8 @@ pub(crate) struct DownstairsClient { /// Number of bytes associated with each IO state io_state_byte_count: ClientIOStateCount, - /// Jobs, write bytes, and total IO bytes in this client's queue - /// - /// These values are used for both global and local (per-client) - /// backpressure. - pub(crate) backpressure_counters: BackpressureCounters, + /// Absolute IO limits for this client + io_limits: ClientIOLimits, /// UUID for this downstairs region /// @@ -200,6 +197,7 @@ impl DownstairsClient { client_id: ClientId, cfg: Arc, target_addr: Option, + io_limits: ClientIOLimits, log: Logger, tls_context: Option>, ) -> Self { @@ -216,6 +214,7 @@ impl DownstairsClient { &log, ), client_id, + io_limits, region_uuid: None, needs_replay: false, negotiation_state: NegotiationState::Start, @@ -232,7 +231,6 @@ impl DownstairsClient { repair_info: None, io_state_job_count: ClientIOStateCount::default(), io_state_byte_count: ClientIOStateCount::default(), - backpressure_counters: BackpressureCounters::new(), connection_id: ConnectionId(0), client_delay_us, } @@ -256,6 +254,10 @@ impl DownstairsClient { cfg, client_task: Self::new_dummy_task(false), client_id: ClientId::new(0), + io_limits: ClientIOLimits::new( + crate::IO_OUTSTANDING_MAX_JOBS * 3 / 2, + crate::IO_OUTSTANDING_MAX_BYTES as usize * 3 / 2, + ), region_uuid: None, needs_replay: false, negotiation_state: NegotiationState::Start, @@ -272,7 +274,6 @@ impl DownstairsClient { repair_info: None, io_state_job_count: ClientIOStateCount::default(), io_state_byte_count: ClientIOStateCount::default(), - backpressure_counters: BackpressureCounters::new(), connection_id: ConnectionId(0), client_delay_us, } @@ -359,19 +360,24 @@ impl DownstairsClient { // Update our bytes-in-flight counter if was_running && !is_running { // Because the job is no longer running, it shouldn't count for - // backpressure. Remove the backpressure guard for this client, - // which decrements backpressure counters on drop. - job.backpressure_guard.take(&self.client_id); - } else if is_running - && !was_running - && !job.backpressure_guard.contains(&self.client_id) - { - // This should only happen if a job is replayed, but that still - // counts! - job.backpressure_guard.insert( - self.client_id, - self.backpressure_counters.increment(&job.work), - ); + // backpressure or IO limits. Remove the backpressure guard for + // this client, which decrements backpressure counters on drop. + job.io_limits.take(&self.client_id); + } else if is_running && !was_running { + match self.io_limits.try_claim(job.work.job_bytes() as u32) { + Ok(g) => { + job.io_limits.insert(self.client_id, g); + } + Err(e) => { + // We can't handle the case of "running out of permits + // during replay", because waiting for a permit would + // deadlock the worker task. Log the error and continue. + warn!( + self.log, + "could not claim IO permits when replaying job: {e:?}" + ) + } + } } old_state @@ -2245,7 +2251,7 @@ impl DownstairsClient { } pub(crate) fn total_bytes_outstanding(&self) -> usize { - self.backpressure_counters.get_io_bytes() as usize + self.io_state_byte_count.in_progress as usize } /// Returns a unique ID for the current connection, or `None` diff --git a/upstairs/src/deferred.rs b/upstairs/src/deferred.rs index cbe43932e..301575a64 100644 --- a/upstairs/src/deferred.rs +++ b/upstairs/src/deferred.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use crate::{ - backpressure::BackpressureGuard, client::ConnectionId, - upstairs::UpstairsConfig, BlockContext, BlockOp, ClientData, ClientId, - ImpactedBlocks, Message, RawWrite, + client::ConnectionId, io_limits::IOLimitGuard, upstairs::UpstairsConfig, + BlockContext, BlockOp, ClientId, ImpactedBlocks, Message, RawWrite, }; use bytes::BytesMut; use crucible_common::{integrity_hash, CrucibleError, RegionDefinition}; @@ -114,7 +113,7 @@ pub(crate) struct DeferredWrite { pub data: BytesMut, pub is_write_unwritten: bool, pub cfg: Arc, - pub guard: ClientData, + pub io_guard: IOLimitGuard, } /// Result of a deferred `BlockOp` @@ -135,7 +134,7 @@ pub(crate) struct EncryptedWrite { pub data: RawWrite, pub impacted_blocks: ImpactedBlocks, pub is_write_unwritten: bool, - pub guard: ClientData, + pub io_guard: IOLimitGuard, } impl DeferredWrite { @@ -183,7 +182,7 @@ impl DeferredWrite { data, impacted_blocks: self.impacted_blocks, is_write_unwritten: self.is_write_unwritten, - guard: self.guard, + io_guard: self.io_guard, } } } diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 16b97774c..cc0a94e71 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -7,10 +7,10 @@ use std::{ }; use crate::{ - backpressure::BackpressureGuard, cdt, client::{ClientAction, ClientStopReason, DownstairsClient, EnqueueResult}, guest::GuestBlockRes, + io_limits::{IOLimitGuard, IOLimits}, live_repair::ExtentInfo, stats::DownstairsStatOuter, upstairs::{UpstairsConfig, UpstairsState}, @@ -270,6 +270,7 @@ impl Downstairs { ds_target: ClientMap, tls_context: Option>, stats: DownstairsStatOuter, + io_limits: &IOLimits, log: Logger, ) -> Self { let mut clients = [None, None, None]; @@ -278,6 +279,7 @@ impl Downstairs { i, cfg.clone(), ds_target.get(&i).copied(), + io_limits.client_limits(i), log.new(o!("client" => i.get().to_string())), tls_context.clone(), )); @@ -349,7 +351,12 @@ impl Downstairs { }); let stats = DownstairsStatOuter::default(); - let mut ds = Self::new(cfg, ClientMap::new(), None, stats, log); + // Build a set of fake IO limits that we won't hit + let io_limits = IOLimits::new(u32::MAX as usize, u32::MAX as usize); + + let mut ds = + Self::new(cfg, ClientMap::new(), None, stats, &io_limits, log); + // Create a fake repair address so this field is populated. for cid in ClientId::iter() { ds.clients[cid].repair_addr = @@ -1115,7 +1122,7 @@ impl Downstairs { // reassignment is handled below. if finished || (repair.aborting_repair && !have_reserved_jobs) { // We're done, submit a final flush! - let flush_id = self.submit_flush(None, None); + let flush_id = self.submit_flush(None, None, None); info!(self.log, "LiveRepair final flush submitted"); cdt::up__to__ds__flush__start!(|| (flush_id.0)); @@ -1184,7 +1191,7 @@ impl Downstairs { let nio = IOop::ExtentLiveNoOp { dependencies: deps }; cdt::gw__noop__start!(|| (noop_id.0)); - self.enqueue(noop_id, nio, None, ClientMap::new()) + self.enqueue(noop_id, nio, None, None) } #[allow(clippy::too_many_arguments)] @@ -1200,7 +1207,7 @@ impl Downstairs { cdt::gw__repair__start!(|| (repair_id.0, eid.0)); - self.enqueue(repair_id, repair_io, None, ClientMap::new()) + self.enqueue(repair_id, repair_io, None, None) } fn repair_or_noop( @@ -1389,7 +1396,7 @@ impl Downstairs { cdt::gw__reopen__start!(|| (reopen_id.0, eid.0)); - self.enqueue(reopen_id, reopen_io, None, ClientMap::new()) + self.enqueue(reopen_id, reopen_io, None, None) } #[cfg(test)] @@ -1417,7 +1424,7 @@ impl Downstairs { block_size: 512, }; - self.enqueue(ds_id, aread, None, ClientMap::new()); + self.enqueue(ds_id, aread, None, None); ds_id } @@ -1448,7 +1455,7 @@ impl Downstairs { iblocks, request, is_write_unwritten, - ClientData::from_fn(|_| BackpressureGuard::dummy()), + IOLimitGuard::dummy(), ) } @@ -1457,7 +1464,7 @@ impl Downstairs { blocks: ImpactedBlocks, write: RawWrite, is_write_unwritten: bool, - bp_guard: ClientData, + io_guard: IOLimitGuard, ) -> JobId { let ds_id = self.next_id(); if is_write_unwritten { @@ -1500,7 +1507,7 @@ impl Downstairs { ds_id, awrite, Some(GuestBlockRes::Acked), // writes are always acked - bp_guard.into(), + Some(io_guard), ); ds_id } @@ -1530,7 +1537,7 @@ impl Downstairs { let close_io = self.create_close_io(eid, deps, repair.to_vec()); cdt::gw__close__start!(|| (close_id.0, eid.0)); - self.enqueue(close_id, close_io, None, ClientMap::new()) + self.enqueue(close_id, close_io, None, None) } /// Get the repair IDs and dependencies for this extent. @@ -1874,6 +1881,7 @@ impl Downstairs { &mut self, snapshot_details: Option, res: Option, + io_guard: Option, ) -> JobId { let next_id = self.next_id(); cdt::gw__flush__start!(|| (next_id.0)); @@ -1903,12 +1911,7 @@ impl Downstairs { }; self.pending_barrier += 1; - self.enqueue( - next_id, - flush, - res.map(GuestBlockRes::Other), - ClientMap::new(), - ); + self.enqueue(next_id, flush, res.map(GuestBlockRes::Other), io_guard); next_id } @@ -1964,12 +1967,7 @@ impl Downstairs { debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}"); self.pending_barrier += 1; - self.enqueue( - next_id, - IOop::Barrier { dependencies }, - None, - ClientMap::new(), - ); + self.enqueue(next_id, IOop::Barrier { dependencies }, None, None); next_id } @@ -2048,6 +2046,7 @@ impl Downstairs { blocks: ImpactedBlocks, data: Buffer, res: BlockRes, + io_guard: IOLimitGuard, ) -> JobId { // If there is a live-repair in progress that intersects with this read, // then reserve job IDs for those jobs. We must do this before @@ -2077,7 +2076,7 @@ impl Downstairs { }; let res = GuestBlockRes::Read(data, res); - self.enqueue(ds_id, aread, Some(res), ClientMap::new()); + self.enqueue(ds_id, aread, Some(res), Some(io_guard)); ds_id } @@ -2087,7 +2086,7 @@ impl Downstairs { blocks: ImpactedBlocks, write: RawWrite, is_write_unwritten: bool, - backpressure_guard: ClientData, + io_guard: IOLimitGuard, ) -> JobId { // If there is a live-repair in progress that intersects with this read, // then reserve job IDs for those jobs. @@ -2097,7 +2096,7 @@ impl Downstairs { blocks, write, is_write_unwritten, - backpressure_guard, + io_guard, ) } @@ -2150,27 +2149,27 @@ impl Downstairs { ds_id: JobId, io: IOop, res: Option, - mut bp_guard: ClientMap, + io_limits: Option, ) { let mut skipped = 0; let last_repair_extent = self.last_repair_extent(); + let mut io_limits = io_limits + .map(ClientMap::from) + .unwrap_or_else(ClientMap::new); + // Send the job to each client! let state = ClientData::from_fn(|cid| { let client = &mut self.clients[cid]; let r = client.enqueue(ds_id, &io, last_repair_extent); match r { - EnqueueResult::Send | EnqueueResult::Hold => { - // Update the per-client backpressure guard - if !bp_guard.contains(&cid) { - let g = client.backpressure_counters.increment(&io); - bp_guard.insert(cid, g); - } - if matches!(r, EnqueueResult::Send) { - self.send(ds_id, io.clone(), cid); - } + EnqueueResult::Send => self.send(ds_id, io.clone(), cid), + EnqueueResult::Hold => (), + EnqueueResult::Skip => { + // Take and drop the ClientIOLimitGuard, freeing up tokens + let _ = io_limits.take(&cid); + skipped += 1; } - EnqueueResult::Skip => skipped += 1, } r.state() }); @@ -2194,7 +2193,7 @@ impl Downstairs { res, replay: false, data: None, - backpressure_guard: bp_guard, + io_limits, }, ); if acked { @@ -2731,10 +2730,10 @@ impl Downstairs { // **not** when they are retired. We'll do a sanity check here // and print a warning if that's not the case. for c in ClientId::iter() { - if job.backpressure_guard.contains(&c) { + if job.io_limits.contains(&c) { warn!( self.log, - "job {ds_id} had pending backpressure bytes \ + "job {ds_id} had pending io limits \ for client {c}" ); // Backpressure is decremented on drop @@ -3362,19 +3361,11 @@ impl Downstairs { } pub(crate) fn write_bytes_outstanding(&self) -> u64 { + // XXX this overlaps with io_state_byte_count self.clients .iter() .filter(|c| matches!(c.state(), DsState::Active)) - .map(|c| c.backpressure_counters.get_write_bytes()) - .max() - .unwrap_or(0) - } - - pub(crate) fn jobs_outstanding(&self) -> u64 { - self.clients - .iter() - .filter(|c| matches!(c.state(), DsState::Active)) - .map(|c| c.backpressure_counters.get_jobs()) + .map(|c| c.io_state_byte_count().in_progress) .max() .unwrap_or(0) } @@ -3502,7 +3493,7 @@ impl Downstairs { data, }, is_write_unwritten, - ClientData::from_fn(|_| BackpressureGuard::dummy()), + IOLimitGuard::dummy(), ) } @@ -3533,7 +3524,7 @@ impl Downstairs { let ddef = self.ddef.as_ref().unwrap(); let block_count = blocks.blocks(ddef).len(); let buf = Buffer::new(block_count, ddef.block_size() as usize); - self.submit_read(blocks, buf, BlockRes::dummy()) + self.submit_read(blocks, buf, BlockRes::dummy(), IOLimitGuard::dummy()) } /// Create a test downstairs which has all clients Active @@ -4243,19 +4234,6 @@ impl Downstairs { }); } - /// Assign the given number of write bytes to the backpressure counters - #[must_use] - pub(crate) fn early_write_backpressure( - &mut self, - bytes: u64, - ) -> ClientData { - ClientData::from_fn(|i| { - self.clients[i] - .backpressure_counters - .early_write_increment(bytes) - }) - } - pub(crate) fn set_ddef(&mut self, ddef: RegionDefinition) { self.ddef = Some(ddef); } @@ -4264,7 +4242,7 @@ impl Downstairs { pub(crate) fn has_live_jobs(&self) -> bool { self.clients .iter() - .any(|c| c.backpressure_counters.get_jobs() > 0) + .any(|c| c.io_state_job_count().in_progress > 0) } /// Returns the per-client state for the given job @@ -4388,7 +4366,7 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); assert!(!ds.process_ds_completion( next_id, @@ -4435,6 +4413,7 @@ pub(crate) mod test { snapshot_name: String::from("snap"), }), None, + None, ); assert!(!ds.process_ds_completion( @@ -4480,7 +4459,7 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); assert!(!ds.process_ds_completion( next_id, @@ -4526,7 +4505,7 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); assert!(!ds.process_ds_completion( next_id, @@ -4922,7 +4901,7 @@ pub(crate) mod test { assert!(ds.retired_ids.is_empty()); // Create the flush and send it to the downstairs - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); // Simulate completing the flush to downstairs 0 and 1 assert!(!ds.process_ds_completion( @@ -5138,7 +5117,7 @@ pub(crate) mod test { // A flush is required to move work to completed // Create the flush then send it to all downstairs. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // Complete the Flush at each downstairs. assert!(!ds.process_ds_completion( @@ -5204,7 +5183,7 @@ pub(crate) mod test { // A flush is required to move work to completed // Create the flush then send it to all downstairs. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // Send and complete the Flush at each downstairs. for cid in ClientId::iter() { @@ -5281,7 +5260,7 @@ pub(crate) mod test { assert!(ds.retired_ids.is_empty()); // Create the flush IO - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // Complete the flush on all three downstairs. assert!(!ds.process_ds_completion( @@ -5376,7 +5355,7 @@ pub(crate) mod test { assert!(ds.retired_ids.is_empty()); // Create and send the flush. - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); // Complete the flush on those downstairs. assert!(!ds.process_ds_completion( @@ -6598,7 +6577,7 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); let response = || Ok(Default::default()); ds.process_ds_completion( @@ -6644,7 +6623,7 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); assert!(ds.process_ds_completion( next_id, ClientId::new(0), @@ -6678,7 +6657,7 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // DS 1 has a failure, and this won't return true as we don't // have enough success yet to ACK to the guest. @@ -6866,7 +6845,7 @@ pub(crate) mod test { // Perform the flush. let next_id = { - let next_id = ds.submit_flush(None, None); + let next_id = ds.submit_flush(None, None, None); // As this DS is failed, it should have been skipped assert_eq!( @@ -7093,7 +7072,7 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); // Enqueue the flush. - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); assert_eq!( ds.job_states(flush_id), @@ -7535,7 +7514,7 @@ pub(crate) mod test { let read_one = ds.create_and_enqueue_generic_read_eob(); // Finally, add a flush - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); let job = ds.ds_active.get(&write_one).unwrap(); assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); @@ -7661,7 +7640,7 @@ pub(crate) mod test { let read_one = ds.create_and_enqueue_generic_read_eob(); // Finally, add a flush - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); let job = ds.ds_active.get(&write_one).unwrap(); assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); @@ -7845,7 +7824,7 @@ pub(crate) mod test { } // Create a flush. - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); let job = ds.ds_active.get(&flush_one).unwrap(); assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); @@ -7898,7 +7877,7 @@ pub(crate) mod test { let write_one = ds.create_and_enqueue_generic_write_eob(false); // Create a flush - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); // Verify all jobs can be acked (or should have been fast-acked) let write_job = ds.ds_active.get(&write_one).unwrap(); assert!(write_job.acked); @@ -7957,12 +7936,12 @@ pub(crate) mod test { // Create a read, write, flush let read_one = ds.create_and_enqueue_generic_read_eob(); let write_one = ds.create_and_enqueue_generic_write_eob(false); - let flush_one = ds.submit_flush(None, None); + let flush_one = ds.submit_flush(None, None, None); // Create more IOs. let _read_two = ds.create_and_enqueue_generic_read_eob(); let _write_two = ds.create_and_enqueue_generic_write_eob(false); - let _flush_two = ds.submit_flush(None, None); + let _flush_two = ds.submit_flush(None, None, None); // Six jobs have been skipped. for cid in ClientId::iter() { @@ -8897,7 +8876,7 @@ pub(crate) mod test { let read_id = ds.submit_read_block(eid, BlockOffset(0)); let write_id = ds.submit_test_write_block(eid, BlockOffset(1), false); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); let repair_ids = create_and_enqueue_repair_ops(&mut ds, eid); @@ -9014,7 +8993,7 @@ pub(crate) mod test { let eid = ExtentId(1); let repair_ids = create_and_enqueue_repair_ops(&mut ds, eid); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); assert_eq!(ds.ds_active.len(), 5); @@ -9094,9 +9073,9 @@ pub(crate) mod test { let mut ds = Downstairs::repair_test_one_repair(); - let flush_id1 = ds.submit_flush(None, None); + let flush_id1 = ds.submit_flush(None, None, None); let repair_ids = create_and_enqueue_repair_ops(&mut ds, ExtentId(1)); - let flush_id2 = ds.submit_flush(None, None); + let flush_id2 = ds.submit_flush(None, None, None); assert_eq!(ds.ds_active.len(), 6); @@ -9129,7 +9108,7 @@ pub(crate) mod test { let repair_ids1 = create_and_enqueue_repair_ops(&mut ds, ExtentId(0)); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); let repair_ids2 = create_and_enqueue_repair_ops(&mut ds, ExtentId(1)); @@ -9622,7 +9601,7 @@ pub(crate) mod test { let read_id = ds.submit_read_block(ExtentId(0), BlockOffset(1)); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); assert_eq!(ds.ds_active.len(), 3); @@ -9809,7 +9788,7 @@ pub(crate) mod test { }, }); - let flush_id = ds.submit_flush(None, None); + let flush_id = ds.submit_flush(None, None, None); assert_eq!(ds.ds_active.len(), 1); assert!(ds.get_deps(flush_id).is_empty()); diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index 69bda4a19..fb7254fbd 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -567,8 +567,7 @@ impl TestHarness { // require triggering a timeout let (g, mut io) = Guest::new(Some(log.clone())); if opts.disable_backpressure { - io.disable_queue_backpressure(); - io.disable_byte_backpressure(); + io.disable_backpressure(); } let guest = Arc::new(g); diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index 077f4c676..da14210fa 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -2,13 +2,10 @@ use std::{ net::SocketAddr, sync::atomic::{AtomicU64, Ordering}, - time::Duration, }; use crate::{ - backpressure::{ - BackpressureAmount, BackpressureConfig, SharedBackpressureAmount, - }, + io_limits::{IOLimitView, IOLimits}, BlockIO, BlockOp, BlockOpWaiter, BlockRes, Buffer, ReadBlockContext, ReplaceResult, UpstairsAction, }; @@ -18,8 +15,8 @@ use crucible_protocol::SnapshotDetails; use async_trait::async_trait; use bytes::BytesMut; -use slog::{error, info, warn, Logger}; -use tokio::sync::{mpsc, Mutex}; +use slog::{info, warn, Logger}; +use tokio::sync::mpsc; use tracing::{instrument, span, Level}; use uuid::Uuid; @@ -110,19 +107,8 @@ pub struct Guest { /// it can be read from a `&self` reference. block_size: AtomicU64, - /// Backpressure is implemented as a delay on host write operations - /// - /// It is stored in an `Arc` so that the `GuestIoHandle` can update it from - /// the IO task. - backpressure: SharedBackpressureAmount, - - /// Lock held during backpressure delay - /// - /// Without this lock, multiple tasks could submit jobs to the upstairs and - /// wait in parallel, which defeats the purpose of backpressure (since you - /// could send arbitrarily many jobs at high speed by sending them from - /// different tasks). - backpressure_lock: Mutex<()>, + /// View into global IO limits + io_limits: IOLimitView, /// Logger for the guest log: Logger, @@ -150,21 +136,31 @@ impl Guest { // time spent waiting for the queue versus time spent in Upstairs code). let (req_tx, req_rx) = mpsc::channel(500); - let backpressure = SharedBackpressureAmount::new(); + // We have to set limits above `IO_OUTSTANDING_MAX_JOBS/BYTES`: + // an `Offline` downstairs must hit that threshold to transition to + // `Faulted`, so we can't be IO-limited before that point. + let io_limits = IOLimits::new( + crate::IO_OUTSTANDING_MAX_JOBS * 3 / 2, + crate::IO_OUTSTANDING_MAX_BYTES as usize * 3 / 2, + ); + let io_limits_view = io_limits.view(); + let io = GuestIoHandle { req_rx, - backpressure: backpressure.clone(), - backpressure_config: BackpressureConfig::default(), + io_limits, + + #[cfg(test)] + disable_backpressure: false, + log: log.clone(), }; let guest = Guest { req_tx, block_size: AtomicU64::new(0), + io_limits: io_limits_view, - backpressure, - backpressure_lock: Mutex::new(()), log, }; (guest, io) @@ -198,28 +194,6 @@ impl Guest { rx.wait().await } - /// Sleeps for a backpressure-dependent amount, holding the lock - /// - /// If backpressure is saturated, logs and returns an error. - async fn backpressure_sleep(&self) -> Result<(), CrucibleError> { - let bp = self.backpressure.load(); - match bp { - BackpressureAmount::Saturated => { - let err = "write queue is saturated"; - error!(self.log, "{err}"); - Err(CrucibleError::IoError(err.to_owned())) - } - BackpressureAmount::Duration(d) => { - if d > Duration::ZERO { - let _guard = self.backpressure_lock.lock().await; - tokio::time::sleep(d).await; - drop(_guard); - } - Ok(()) - } - } - } - #[cfg(test)] pub async fn downstairs_state( &self, @@ -353,11 +327,20 @@ impl BlockIO for Guest { assert_eq!(chunk.len() as u64 % bs, 0); let offset_change = chunk.len() as u64 / bs; + let io_guard = + self.io_limits.claim(chunk.len() as u32).await.map_err( + |e| { + CrucibleError::IoError(format!( + "could not get IO guard for Read: {e:?}" + )) + }, + )?; let (rx, done) = BlockOpWaiter::pair(); let rio = BlockOp::Read { offset, data: chunk, done, + io_guard, }; // Our return value always includes the buffer, so we can splice it @@ -405,9 +388,8 @@ impl BlockIO for Guest { } // We split writes into chunks to bound the maximum (typical) latency of - // any single `BlockOp::Write`. Otherwise, the host could send writes - // which are large enough that our maximum backpressure delay wouldn't - // compensate for them. + // any single `BlockOp::Write`. This makes the system's performance + // characteristics easier to think about. const MDTS: usize = 1024 * 1024; // 1 MiB while !data.is_empty() { @@ -415,13 +397,19 @@ impl BlockIO for Guest { assert_eq!(buf.len() as u64 % bs, 0); let offset_change = buf.len() as u64 / bs; - self.backpressure_sleep().await?; + let io_guard = + self.io_limits.claim(buf.len() as u32).await.map_err(|e| { + CrucibleError::IoError(format!( + "could not get IO guard for Write: {e:?}" + )) + })?; let reply = self .send_and_wait(|done| BlockOp::Write { offset, data: buf, done, + io_guard, }) .await; reply?; @@ -442,11 +430,17 @@ impl BlockIO for Guest { return Ok(()); } - self.backpressure_sleep().await?; + let io_guard = + self.io_limits.claim(data.len() as u32).await.map_err(|e| { + CrucibleError::IoError(format!( + "could not get IO guard for WriteUnwritten: {e:?}" + )) + })?; self.send_and_wait(|done| BlockOp::WriteUnwritten { offset, data, done, + io_guard, }) .await } @@ -455,9 +449,15 @@ impl BlockIO for Guest { &self, snapshot_details: Option, ) -> Result<(), CrucibleError> { + let io_guard = self.io_limits.claim(0).await.map_err(|e| { + CrucibleError::IoError(format!( + "could not get IO guard for flush: {e:?}" + )) + })?; self.send_and_wait(|done| BlockOp::Flush { snapshot_details, done, + io_guard, }) .await } @@ -514,14 +514,15 @@ pub struct GuestIoHandle { /// Queue to receive new blockreqs req_rx: mpsc::Receiver, - /// Current backpressure (shared with the `Guest`) - backpressure: SharedBackpressureAmount, - - /// Backpressure configuration, as a starting point and max delay - backpressure_config: BackpressureConfig, + /// IO limiting (shared with the `Guest`) + io_limits: IOLimits, /// Log handle, mainly to pass it into the [`Upstairs`] pub log: Logger, + + /// Flag to disable backpressure during unit tests + #[cfg(test)] + disable_backpressure: bool, } impl GuestIoHandle { @@ -536,29 +537,17 @@ impl GuestIoHandle { } #[cfg(test)] - pub fn disable_queue_backpressure(&mut self) { - self.backpressure_config.queue.delay_scale = Duration::ZERO; + pub fn disable_backpressure(&mut self) { + self.disable_backpressure = true; } #[cfg(test)] - pub fn disable_byte_backpressure(&mut self) { - self.backpressure_config.bytes.delay_scale = Duration::ZERO; - } - - #[cfg(test)] - pub fn is_queue_backpressure_disabled(&self) -> bool { - self.backpressure_config.queue.delay_scale == Duration::ZERO - } - - /// Set `self.backpressure` based on outstanding IO ratio - pub fn set_backpressure(&self, bytes: u64, jobs: u64) { - let bp = self.backpressure_config.get_backpressure(bytes, jobs); - self.backpressure.store(bp); + pub fn is_backpressure_disabled(&self) -> bool { + self.disable_backpressure } - /// Looks up current backpressure - pub fn get_backpressure(&self) -> BackpressureAmount { - self.backpressure.load() + pub(crate) fn io_limits(&self) -> &IOLimits { + &self.io_limits } } diff --git a/upstairs/src/io_limits.rs b/upstairs/src/io_limits.rs new file mode 100644 index 000000000..14958f2a2 --- /dev/null +++ b/upstairs/src/io_limits.rs @@ -0,0 +1,158 @@ +// Copyright 2024 Oxide Computer Company +use crate::{ClientData, ClientId, ClientMap}; +use std::sync::Arc; +use tokio::sync::{ + AcquireError, OwnedSemaphorePermit, Semaphore, TryAcquireError, +}; + +/// Per-client IO limits +#[derive(Clone, Debug)] +pub struct ClientIOLimits { + /// Semaphore to claim IO bytes on behalf of a job + io_bytes: Arc, + + /// Semaphore to claim individual IO jobs + jobs: Arc, +} + +impl ClientIOLimits { + /// Builds a new `ClientIOLimits` object with the given limits + pub fn new(max_jobs: usize, max_io_bytes: usize) -> Self { + ClientIOLimits { + io_bytes: Semaphore::new(max_io_bytes).into(), + jobs: Semaphore::new(max_jobs).into(), + } + } + + /// Claims a certain number of bytes (and one job) + /// + /// This function waits until the given resources are available; as such, it + /// should not be called from the same task which is processing jobs (since + /// that could create a deadlock). + async fn claim( + &self, + bytes: u32, + ) -> Result { + let io_bytes = self.io_bytes.clone().acquire_many_owned(bytes).await?; + let jobs = self.jobs.clone().acquire_owned().await?; + Ok(ClientIOLimitGuard { io_bytes, jobs }) + } + + /// Tries to claim a certain number of bytes (and one job) + pub fn try_claim( + &self, + bytes: u32, + ) -> Result { + let io_bytes = self.io_bytes.clone().try_acquire_many_owned(bytes)?; + let jobs = self.jobs.clone().try_acquire_owned()?; + Ok(ClientIOLimitGuard { io_bytes, jobs }) + } +} + +/// Read-write handle for IO limits across all 3x clients +#[derive(Clone, Debug)] +pub struct IOLimits(ClientData); + +impl IOLimits { + /// Builds a new set of IO limits + pub fn new(max_jobs: usize, max_io_bytes: usize) -> Self { + Self(ClientData::from_fn(|_i| { + ClientIOLimits::new(max_jobs, max_io_bytes) + })) + } + + /// Returns a per-client IO limit handle + /// + /// This handle shares permits with the parent + pub fn client_limits(&self, i: ClientId) -> ClientIOLimits { + self.0[i].clone() + } + + /// Returns a view handle for the IO limits + pub fn view(&self) -> IOLimitView { + IOLimitView(self.clone()) + } + + /// Try to claim some number of bytes (and one job) + /// + /// Returns `Err((ClientId, e))` if any of the claims fail + pub fn try_claim( + &self, + bytes: u32, + ) -> Result { + let mut out = ClientData::from_fn(|_| None); + for i in ClientId::iter() { + out[i] = Some(self.0[i].try_claim(bytes).map_err(|e| (i, e))?); + } + + Ok(IOLimitGuard(out.map(Option::unwrap))) + } +} + +/// View into global IO limits +/// +/// This is equivalent to an [`IOLimits`], but exposes a different API. It +/// should be owned by a separate task, to avoid deadlocks when trying to claim +/// resources. +#[derive(Clone, Debug)] +pub struct IOLimitView(IOLimits); + +impl IOLimitView { + /// Claim some number of bytes (and one job) + /// + /// This function waits until the given resources are available; as such, it + /// should not be called from the same task which is processing jobs (since + /// that could create a deadlock). + pub async fn claim( + &self, + bytes: u32, + ) -> Result { + let mut out = ClientData::from_fn(|_| None); + let lim = &self.0; + for i in ClientId::iter() { + out[i] = Some(lim.0[i].claim(bytes).await?); + } + Ok(IOLimitGuard(out.map(Option::unwrap))) + } +} + +//////////////////////////////////////////////////////////////////////////////// + +/// Handle owning some amount of per-client IO +/// +/// The IO permits are released when this handle is dropped +#[derive(Debug)] +pub struct ClientIOLimitGuard { + #[expect(unused)] + io_bytes: OwnedSemaphorePermit, + #[expect(unused)] + jobs: OwnedSemaphorePermit, +} + +impl ClientIOLimitGuard { + #[cfg(test)] + pub fn dummy() -> Self { + let a = Arc::new(Semaphore::new(1)); + let b = Arc::new(Semaphore::new(1)); + let io_bytes = a.try_acquire_owned().unwrap(); + let jobs = b.try_acquire_owned().unwrap(); + ClientIOLimitGuard { io_bytes, jobs } + } +} + +/// Handle which stores IO limit guards for all 3x clients +#[derive(Debug)] +pub struct IOLimitGuard(ClientData); + +impl From for ClientMap { + fn from(i: IOLimitGuard) -> Self { + i.0.into() + } +} + +impl IOLimitGuard { + #[cfg(test)] + pub fn dummy() -> Self { + Self(ClientData::from_fn(|_i| ClientIOLimitGuard::dummy())) + } +} diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index bc511deb5..4e798cfce 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -48,9 +48,6 @@ pub use in_memory::InMemoryBlockIO; pub mod block_io; pub use block_io::{FileBlockIO, ReqwestBlockIO}; -pub(crate) mod backpressure; -use backpressure::BackpressureGuard; - pub mod block_req; pub(crate) use block_req::{BlockOpWaiter, BlockRes}; @@ -86,17 +83,20 @@ mod downstairs; mod upstairs; use upstairs::{UpCounters, UpstairsAction}; +mod io_limits; +use io_limits::IOLimitGuard; + /// Max number of write bytes between the upstairs and an offline downstairs /// -/// If we exceed this value, the upstairs will give -/// up and mark the offline downstairs as faulted. -const IO_OUTSTANDING_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB +/// If we exceed this value, the upstairs will give up and mark the offline +/// downstairs as faulted. +const IO_OUTSTANDING_MAX_BYTES: u64 = 50 * 1024 * 1024; // 50 MiB /// Max number of outstanding IOs between the upstairs and an offline downstairs /// /// If we exceed this value, the upstairs will give up and mark that offline /// downstairs as faulted. -const IO_OUTSTANDING_MAX_JOBS: usize = 10000; +pub const IO_OUTSTANDING_MAX_JOBS: usize = 1000; /// Maximum of bytes to cache from complete (but un-flushed) IO /// @@ -448,6 +448,11 @@ impl ClientData { ]) } + /// Builds a new `ClientData` by applying a function to each item + pub fn map U>(self, f: F) -> ClientData { + ClientData(self.0.map(f)) + } + #[cfg(test)] pub fn get(&self) -> &[T; 3] { &self.0 @@ -926,11 +931,7 @@ struct DownstairsIO { /// consistency checking with subsequent replies. data: Option, - /// Handle for this job's contribution to guest backpressure - /// - /// Each of these guard handles will automatically decrement the - /// backpressure count for their respective Downstairs when dropped. - backpressure_guard: ClientMap, + io_limits: ClientMap, } impl DownstairsIO { @@ -1315,16 +1316,6 @@ impl IOop { _ => 0, } } - - /// Returns the number of bytes written - fn write_bytes(&self) -> u64 { - match &self { - IOop::Write { data, .. } | IOop::WriteUnwritten { data, .. } => { - data.len() as u64 - } - _ => 0, - } - } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -1502,20 +1493,24 @@ pub(crate) enum BlockOp { offset: BlockIndex, data: Buffer, done: BlockRes, + io_guard: IOLimitGuard, }, Write { offset: BlockIndex, data: BytesMut, done: BlockRes, + io_guard: IOLimitGuard, }, WriteUnwritten { offset: BlockIndex, data: BytesMut, done: BlockRes, + io_guard: IOLimitGuard, }, Flush { snapshot_details: Option, done: BlockRes, + io_guard: IOLimitGuard, }, GoActive { done: BlockRes, @@ -1584,8 +1579,6 @@ pub struct Arg { pub up_counters: UpCounters, /// Next JobID pub next_job_id: JobId, - /// Backpressure value - pub up_backpressure: u64, /// Jobs on the downstairs work queue. pub ds_count: u32, /// Number of write bytes in flight @@ -1664,7 +1657,7 @@ pub fn up_main( }; #[cfg(test)] - let disable_backpressure = guest.is_queue_backpressure_disabled(); + let disable_backpressure = guest.is_backpressure_disabled(); /* * Build the Upstairs struct that we use to share data between @@ -1675,7 +1668,7 @@ pub fn up_main( #[cfg(test)] if disable_backpressure { - up.disable_client_backpressure(); + up.downstairs.disable_client_backpressure(); } if let Some(pr) = producer_registry { diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index a19a504fa..31a518b7e 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -10,6 +10,7 @@ use crate::{ }, downstairs::{Downstairs, DownstairsAction}, extent_from_offset, + io_limits::IOLimitGuard, stats::UpStatOuter, BlockOp, BlockRes, Buffer, ClientId, ClientMap, CrucibleOpts, DsState, EncryptionContext, GuestIoHandle, Message, RegionDefinition, @@ -172,7 +173,7 @@ impl UpCounters { /// - Ack all ackable jobs to the guest /// - Step through the live-repair state machine (if it's running) /// - Check for client-side deactivation (if it's pending) -/// - Set backpressure time in the guest +/// - Set backpressure time in the clients /// /// Keeping the `Upstairs` "clean" through this invariant maintenance makes it /// easier to think about its state, because it's guaranteed to be clean when we @@ -389,6 +390,7 @@ impl Upstairs { ds_target, tls_context, stats.ds_stats(), + guest.io_limits(), log.new(o!("" => "downstairs")), ); let flush_timeout_secs = opt.flush_timeout.unwrap_or(0.5); @@ -423,11 +425,6 @@ impl Upstairs { } } - #[cfg(test)] - pub(crate) fn disable_client_backpressure(&mut self) { - self.downstairs.disable_client_backpressure(); - } - /// Build an Upstairs for simple tests #[cfg(test)] pub fn test_default(ddef: Option) -> Self { @@ -570,7 +567,8 @@ impl Upstairs { .counters .action_flush_check)); if self.need_flush { - self.submit_flush(None, None); + let io_guard = self.try_acquire_io(0); + self.submit_flush(None, None, io_guard); } self.flush_deadline = Instant::now() + self.flush_interval; } @@ -661,7 +659,7 @@ impl Upstairs { // For now, check backpressure after every event. We may want to make // this more nuanced in the future. - self.set_backpressure(); + self.downstairs.set_client_backpressure(); // We do this last because some of the code above can be slow // (especially during debug builds), and we don't want to set our flush @@ -671,6 +669,33 @@ impl Upstairs { } } + /// Attempts to acquire permits to perform an IO job with the given bytes + /// + /// Upon failure, logs an error and returns `None`. + /// + /// This function is used by messages generated internally to the Upstairs + /// for best-effort IO limiting. If the message would exceed our available + /// permits, it's still allowed (because to do otherwise would deadlock the + /// upstairs task). In other words, internally generated messages can limit + /// guest IO work, but not the other way around + fn try_acquire_io(&self, bytes: usize) -> Option { + let Ok(bytes) = u32::try_from(bytes) else { + warn!(self.log, "too many bytes for try_acquire_io"); + return None; + }; + match self.guest.io_limits().try_claim(bytes) { + Ok(v) => Some(v), + Err((i, e)) => { + warn!( + self.log, + "could not apply IO limits to upstairs work: \ + client {i} returned {e:?}" + ); + None + } + } + } + /// Helper function to await all deferred block requests /// /// This is only useful in tests because it **only** processes deferred @@ -726,7 +751,6 @@ impl Upstairs { up_count: self.downstairs.gw_active.len() as u32, up_counters: self.counters, next_job_id: self.downstairs.peek_next_id(), - up_backpressure: self.guest.get_backpressure().as_micros(), write_bytes_out: self.downstairs.write_bytes_outstanding(), ds_count: self.downstairs.active_count() as u32, ds_state: self.downstairs.collect_stats(|c| c.state()), @@ -919,11 +943,21 @@ impl Upstairs { match op { // All Write operations are deferred, because they will offload // encryption to a separate thread pool. - BlockOp::Write { offset, data, done } => { - self.submit_deferred_write(offset, data, done, false); + BlockOp::Write { + offset, + data, + done, + io_guard, + } => { + self.submit_deferred_write(offset, data, done, false, io_guard); } - BlockOp::WriteUnwritten { offset, data, done } => { - self.submit_deferred_write(offset, data, done, true); + BlockOp::WriteUnwritten { + offset, + data, + done, + io_guard, + } => { + self.submit_deferred_write(offset, data, done, true, io_guard); } // If we have any deferred requests in the FuturesOrdered, then we // have to keep using it for subsequent requests (even ones that are @@ -1090,15 +1124,19 @@ impl Upstairs { done.send_ok(self.show_all_work()); } - BlockOp::Read { offset, data, done } => { - self.submit_read(offset, data, done) - } + BlockOp::Read { + offset, + data, + done, + io_guard, + } => self.submit_read(offset, data, done, io_guard), BlockOp::Write { .. } | BlockOp::WriteUnwritten { .. } => { panic!("writes must always be deferred") } BlockOp::Flush { snapshot_details, done, + io_guard, } => { /* * Submit for read and write both check if the upstairs is @@ -1111,7 +1149,7 @@ impl Upstairs { done.send_err(CrucibleError::UpstairsInactive); return; } - self.submit_flush(Some(done), snapshot_details); + self.submit_flush(Some(done), snapshot_details, Some(io_guard)); } BlockOp::ReplaceDownstairs { id, old, new, done } => { let r = self.downstairs.replace(id, old, new, &self.state); @@ -1251,7 +1289,8 @@ impl Upstairs { } if !self.downstairs.can_deactivate_immediately() { debug!(self.log, "not ready to deactivate; submitting final flush"); - self.submit_flush(None, None); + let io_guard = self.try_acquire_io(0); + self.submit_flush(None, None, io_guard); } else { debug!(self.log, "ready to deactivate right away"); // Deactivation is handled in the invariant-checking portion of @@ -1265,6 +1304,7 @@ impl Upstairs { &mut self, res: Option, snapshot_details: Option, + io_guard: Option, ) { // Notice that unlike submit_read and submit_write, we do not check for // guest_io_ready here. The upstairs itself can call submit_flush @@ -1282,7 +1322,9 @@ impl Upstairs { if snapshot_details.is_some() { info!(self.log, "flush with snap requested"); } - let ds_id = self.downstairs.submit_flush(snapshot_details, res); + let ds_id = + self.downstairs + .submit_flush(snapshot_details, res, io_guard); cdt::up__to__ds__flush__start!(|| (ds_id.0)); } @@ -1305,8 +1347,7 @@ impl Upstairs { offset: BlockIndex, data: Buffer, ) { - let br = BlockRes::dummy(); - self.submit_read(offset, data, br) + self.submit_read(offset, data, BlockRes::dummy(), IOLimitGuard::dummy()) } /// Submit a read job to the downstairs @@ -1315,6 +1356,7 @@ impl Upstairs { offset: BlockIndex, data: Buffer, res: BlockRes, + io_guard: IOLimitGuard, ) { if !self.guest_io_ready() { res.send_err((data, CrucibleError::UpstairsInactive)); @@ -1353,7 +1395,9 @@ impl Upstairs { * Grab this ID after extent_from_offset: in case of Err we don't * want to create a gap in the IDs. */ - let ds_id = self.downstairs.submit_read(impacted_blocks, data, res); + let ds_id = + self.downstairs + .submit_read(impacted_blocks, data, res, io_guard); cdt::up__to__ds__read__start!(|| (ds_id.0)); } @@ -1373,6 +1417,7 @@ impl Upstairs { data, BlockRes::dummy(), is_write_unwritten, + IOLimitGuard::dummy(), ) { self.submit_write(DeferredWrite::run(w)) } @@ -1389,14 +1434,19 @@ impl Upstairs { data: BytesMut, res: BlockRes, is_write_unwritten: bool, + io_guard: IOLimitGuard, ) { // It's possible for the write to be invalid out of the gate, in which // case `compute_deferred_write` replies to the `res` itself and returns // `None`. Otherwise, we have to store a future to process the write // result. - if let Some(w) = - self.compute_deferred_write(offset, data, res, is_write_unwritten) - { + if let Some(w) = self.compute_deferred_write( + offset, + data, + res, + is_write_unwritten, + io_guard, + ) { let should_defer = !self.deferred_ops.is_empty() || w.data.len() > MIN_DEFER_SIZE_BYTES as usize; if should_defer { @@ -1418,6 +1468,7 @@ impl Upstairs { data: BytesMut, res: BlockRes, is_write_unwritten: bool, + io_guard: IOLimitGuard, ) -> Option { if !self.guest_io_ready() { res.send_err(CrucibleError::UpstairsInactive); @@ -1445,8 +1496,6 @@ impl Upstairs { let impacted_blocks = extent_from_offset(&ddef, offset, ddef.bytes_to_blocks(data.len())); - let guard = self.downstairs.early_write_backpressure(data.len() as u64); - // Fast-ack, pretending to be done immediately operations res.send_ok(()); @@ -1456,7 +1505,7 @@ impl Upstairs { data, is_write_unwritten, cfg: self.cfg.clone(), - guard, + io_guard, }) } @@ -1476,7 +1525,7 @@ impl Upstairs { write.impacted_blocks, write.data, write.is_write_unwritten, - write.guard, + write.io_guard, ); if write.is_write_unwritten { @@ -2024,16 +2073,6 @@ impl Upstairs { self.downstairs.reinitialize(client_id, &self.state); } - /// Sets both guest and per-client backpressure - fn set_backpressure(&self) { - self.guest.set_backpressure( - self.downstairs.write_bytes_outstanding(), - self.downstairs.jobs_outstanding(), - ); - - self.downstairs.set_client_backpressure(); - } - /// Returns the `RegionDefinition` /// /// # Panics @@ -2355,7 +2394,7 @@ pub(crate) mod test { ); // op 1 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // op 2 upstairs.submit_dummy_write( @@ -2400,7 +2439,7 @@ pub(crate) mod test { } // op 3 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // ops 4 to 6 for i in 3..6 { @@ -2731,7 +2770,7 @@ pub(crate) mod test { ); // op 1 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // op 2 upstairs.submit_dummy_read(BlockIndex(0), Buffer::new(2, 512)); @@ -2758,11 +2797,11 @@ pub(crate) mod test { let mut upstairs = make_upstairs(); upstairs.force_active().unwrap(); - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); let jobs = upstairs.downstairs.get_all_jobs(); assert_eq!(jobs.len(), 3); @@ -2792,7 +2831,7 @@ pub(crate) mod test { upstairs.force_active().unwrap(); // op 0 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // ops 1 to 2 for i in 0..2 { @@ -2804,7 +2843,7 @@ pub(crate) mod test { } // op 3 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // ops 4 to 6 for i in 0..3 { @@ -2816,7 +2855,7 @@ pub(crate) mod test { } // op 7 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); let jobs = upstairs.downstairs.get_all_jobs(); assert_eq!(jobs.len(), 8); @@ -3270,7 +3309,7 @@ pub(crate) mod test { upstairs.submit_dummy_read(BlockIndex(95), Buffer::new(2, 512)); // op 1 - upstairs.submit_flush(None, None); + upstairs.submit_flush(None, None, None); // op 2 upstairs.submit_dummy_write( @@ -3489,10 +3528,21 @@ pub(crate) mod test { let offset = BlockIndex(7); let data = BytesMut::from([1; 512].as_slice()); let (_write_res, done) = BlockOpWaiter::pair(); + let io_guard = IOLimitGuard::dummy(); let op = if is_write_unwritten { - BlockOp::WriteUnwritten { offset, data, done } + BlockOp::WriteUnwritten { + offset, + data, + done, + io_guard, + } } else { - BlockOp::Write { offset, data, done } + BlockOp::Write { + offset, + data, + done, + io_guard, + } }; up.apply(UpstairsAction::Guest(op)); up.await_deferred_ops().await; @@ -3605,7 +3655,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // fake read response from downstairs that will successfully decrypt let mut data = Vec::from([1u8; 512]); @@ -3653,7 +3709,13 @@ pub(crate) mod test { let data = Buffer::new(blocks, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); let mut data = Vec::from([1u8; 512]); @@ -3711,7 +3773,13 @@ pub(crate) mod test { let data = Buffer::new(blocks, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // fake read response from downstairs that will fail decryption let mut data = Vec::from([1u8; 512]); @@ -3787,7 +3855,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // fake read response from downstairs that will fail decryption let mut data = Vec::from([1u8; 512]); @@ -3850,7 +3924,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // fake read response from downstairs that will fail integrity hash // check @@ -3895,7 +3975,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); let data = BytesMut::from([1u8; 512].as_slice()); let hash = integrity_hash(&[&data]); @@ -3956,7 +4042,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); for client_id in [ClientId::new(0), ClientId::new(1)] { let data = BytesMut::from([1u8; 512].as_slice()); @@ -4019,7 +4111,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); let data = BytesMut::from([1u8; 512].as_slice()); let hash = integrity_hash(&[&data]); @@ -4079,7 +4177,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // The first read has no block contexts, because it was unwritten let data = BytesMut::from([0u8; 512].as_slice()); @@ -4135,7 +4239,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); // The first read has no block contexts, because it was unwritten let data = BytesMut::from([0u8; 512].as_slice()); @@ -4195,7 +4305,13 @@ pub(crate) mod test { data.extend_from_slice(vec![1; NODEFER_SIZE].as_slice()); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Write { + offset, + data, + done, + io_guard, + })); assert_eq!(up.deferred_ops.len(), 0); // Submit a long write, which should be deferred @@ -4203,7 +4319,13 @@ pub(crate) mod test { data.extend_from_slice(vec![2; DEFER_SIZE].as_slice()); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Write { + offset, + data, + done, + io_guard, + })); assert_eq!(up.deferred_ops.len(), 1); assert_eq!(up.deferred_msgs.len(), 0); @@ -4213,7 +4335,13 @@ pub(crate) mod test { data.extend_from_slice(vec![3; NODEFER_SIZE].as_slice()); let offset = BlockIndex(7); let (_res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Write { + offset, + data, + done, + io_guard, + })); assert_eq!(up.deferred_ops.len(), 2); assert_eq!(up.deferred_msgs.len(), 0); } @@ -4235,7 +4363,13 @@ pub(crate) mod test { let data = Buffer::new(1, 512); let offset = BlockIndex(7); let (res, done) = BlockOpWaiter::pair(); - up.apply(UpstairsAction::Guest(BlockOp::Read { offset, data, done })); + let io_guard = IOLimitGuard::dummy(); + up.apply(UpstairsAction::Guest(BlockOp::Read { + offset, + data, + done, + io_guard, + })); let reply = res.wait_raw().await.unwrap(); match reply {