diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index f2e62b3ed..660af6956 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -10,7 +10,14 @@ use crate::{ use crucible_common::x509::TLSContext; use crucible_protocol::{ReconciliationId, CRUCIBLE_MESSAGE_VERSION}; -use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; +use std::{ + collections::BTreeSet, + net::SocketAddr, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; use futures::StreamExt; use slog::{debug, error, info, o, warn, Logger}; @@ -18,7 +25,7 @@ use tokio::{ io::AsyncWriteExt, net::{TcpSocket, TcpStream}, sync::{mpsc, oneshot}, - time::sleep_until, + time::{sleep_until, Duration}, }; use tokio_util::codec::{Encoder, FramedRead}; use uuid::Uuid; @@ -154,6 +161,12 @@ pub(crate) struct DownstairsClient { /// IO state counters pub(crate) io_state_count: ClientIOStateCount, + /// Bytes in queues for this client + /// + /// This includes read, write, and write-unwritten jobs, and is used to + /// estimate per-client backpressure to keep the 3x downstairs in sync. + pub(crate) bytes_outstanding: u64, + /// UUID for this downstairs region /// /// Unpopulated until provided by `Message::RegionInfo` @@ -216,6 +229,9 @@ pub(crate) struct DownstairsClient { /// Session ID for a clients connection to a downstairs. connection_id: ConnectionId, + + /// Per-client delay, shared with the [`DownstairsClient`] + client_delay_us: Arc, } impl DownstairsClient { @@ -226,6 +242,7 @@ impl DownstairsClient { log: Logger, tls_context: Option>, ) -> Self { + let client_delay_us = Arc::new(AtomicU64::new(0)); Self { cfg, client_task: Self::new_io_task( @@ -234,6 +251,7 @@ impl DownstairsClient { false, // do not start the task until GoActive client_id, tls_context.clone(), + client_delay_us.clone(), &log, ), client_id, @@ -252,7 +270,9 @@ impl DownstairsClient { region_metadata: None, repair_info: None, io_state_count: ClientIOStateCount::new(), + bytes_outstanding: 0, connection_id: ConnectionId(0), + client_delay_us, } } @@ -262,6 +282,7 @@ impl DownstairsClient { /// client will disappear into the void. #[cfg(test)] fn test_default() -> Self { + let client_delay_us = Arc::new(AtomicU64::new(0)); let cfg = Arc::new(UpstairsConfig { encryption_context: None, upstairs_id: Uuid::new_v4(), @@ -289,7 +310,9 @@ impl DownstairsClient { region_metadata: None, repair_info: None, io_state_count: ClientIOStateCount::new(), + bytes_outstanding: 0, connection_id: ConnectionId(0), + client_delay_us, } } @@ -407,9 +430,26 @@ impl DownstairsClient { job: &mut DownstairsIO, new_state: IOState, ) -> IOState { + let is_running = + matches!(new_state, IOState::New | IOState::InProgress); self.io_state_count.incr(&new_state); let old_state = job.state.insert(self.client_id, new_state); + let was_running = + matches!(old_state, IOState::New | IOState::InProgress); self.io_state_count.decr(&old_state); + + // Update our bytes-in-flight counter + if was_running && !is_running { + self.bytes_outstanding = self + .bytes_outstanding + .checked_sub(job.work.job_bytes()) + .unwrap(); + } else if is_running && !was_running { + // This should only happen if a job is replayed, but that still + // counts! + self.bytes_outstanding += job.work.job_bytes(); + } + old_state } @@ -471,8 +511,12 @@ impl DownstairsClient { } /// Sets this job as skipped and moves it to `skipped_jobs` + /// + /// # Panics + /// If the job is not new or in-progress pub(crate) fn skip_job(&mut self, job: &mut DownstairsIO) { - self.set_job_state(job, IOState::Skipped); + let prev_state = self.set_job_state(job, IOState::Skipped); + assert!(matches!(prev_state, IOState::New | IOState::InProgress)); self.skipped_jobs.insert(job.ds_id); } @@ -628,6 +672,7 @@ impl DownstairsClient { } self.connection_id.update(); + // Restart with a short delay self.start_task(true, auto_promote); } @@ -652,6 +697,7 @@ impl DownstairsClient { connect, self.client_id, self.tls_context.clone(), + self.client_delay_us.clone(), &self.log, ); } @@ -662,6 +708,7 @@ impl DownstairsClient { connect: bool, client_id: ClientId, tls_context: Option>, + client_delay_us: Arc, log: &Logger, ) -> ClientTaskHandle { #[cfg(test)] @@ -672,6 +719,7 @@ impl DownstairsClient { connect, client_id, tls_context, + client_delay_us, log, ) } else { @@ -685,6 +733,7 @@ impl DownstairsClient { connect, client_id, tls_context, + client_delay_us, log, ) } @@ -695,6 +744,7 @@ impl DownstairsClient { connect: bool, client_id: ClientId, tls_context: Option>, + client_delay_us: Arc, log: &Logger, ) -> ClientTaskHandle { // These channels must support at least MAX_ACTIVE_COUNT messages; @@ -730,6 +780,7 @@ impl DownstairsClient { log: log.clone(), }, delay, + client_delay_us, log, }; c.run().await @@ -945,6 +996,9 @@ impl DownstairsClient { IOState::New } }; + if r == IOState::New { + self.bytes_outstanding += io.work.job_bytes(); + } self.io_state_count.incr(&r); r } @@ -1261,9 +1315,8 @@ impl DownstairsClient { } }; - let old_state = job.state.insert(self.client_id, new_state.clone()); - self.io_state_count.decr(&old_state); - self.io_state_count.incr(&new_state); + // Update the state, maintaining various counters + let old_state = self.set_job_state(job, new_state.clone()); /* * Verify the job was InProgress @@ -2209,6 +2262,10 @@ impl DownstairsClient { (self.io_state_count.new + self.io_state_count.in_progress) as usize } + pub(crate) fn total_bytes_outstanding(&self) -> usize { + self.bytes_outstanding as usize + } + /// Returns a unique ID for the current connection, or `None` /// /// This can be used to disambiguate between messages returned from @@ -2220,6 +2277,16 @@ impl DownstairsClient { None } } + + /// Sets the per-client delay + pub(crate) fn set_delay_us(&self, delay: u64) { + self.client_delay_us.store(delay, Ordering::Relaxed); + } + + /// Looks up the per-client delay + pub(crate) fn get_delay_us(&self) -> u64 { + self.client_delay_us.load(Ordering::Relaxed) + } } /// How to handle "promote to active" requests @@ -2420,6 +2487,9 @@ struct ClientIoTask { /// Handle for the rx task recv_task: ClientRxTask, + /// Shared handle to receive per-client backpressure delay + client_delay_us: Arc, + log: Logger, } @@ -2692,6 +2762,24 @@ impl ClientIoTask { + std::marker::Send + 'static, { + // Delay communication with this client based on backpressure, to keep + // the three clients relatively in sync with each other. + // + // We don't need to delay writes, because they're already constrained by + // the global backpressure system and cannot build up an unbounded + // queue. This is admittedly quite subtle; see crucible#1167 for + // discussions and graphs. + if !matches!( + m, + ClientRequest::Message(Message::Write { .. }) + | ClientRequest::RawMessage(RawMessage::Write { .. }, ..) + ) { + let d = self.client_delay_us.load(Ordering::Relaxed); + if d > 0 { + tokio::time::sleep(Duration::from_micros(d)).await; + } + } + // There's some duplication between this function and `cmd_loop` above, // but it's not obvious whether there's a cleaner way to organize stuff. tokio::select! { diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 53cf43a4e..25edbad39 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -3,6 +3,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap, VecDeque}, net::SocketAddr, sync::Arc, + time::Duration, }; use crate::{ @@ -39,6 +40,9 @@ pub(crate) struct Downstairs { /// Per-client data pub(crate) clients: ClientData, + /// Per-client backpressure configuration + backpressure_config: DownstairsBackpressureConfig, + /// The active list of IO for the downstairs. pub(crate) ds_active: ActiveJobs, @@ -220,6 +224,23 @@ impl Downstairs { let clients = clients.map(Option::unwrap); Self { clients: ClientData(clients), + backpressure_config: DownstairsBackpressureConfig { + // start byte-based backpressure at 25 MiB of difference + bytes_start: 25 * 1024 * 1024, + // bytes_scale is chosen to have 1.5 ms of delay at 100 MiB of + // discrepancy + bytes_scale: 5e-7, + + // start job-based backpressure at 100 jobs of discrepancy + jobs_start: 100, + // job_scale is chosen to have 1 ms of of per-job delay at 900 + // jobs of discrepancy + jobs_scale: 0.04, + + // max delay is 10 ms. This is meant to be a gentle nudge, not + // a giant shove, and even 10 ms may be too high. + max_delay: Duration::from_millis(10), + }, cfg, next_flush: 0, ds_active: ActiveJobs::new(), @@ -659,6 +680,11 @@ impl Downstairs { // Restart the IO task for that specific client self.clients[client_id].reinitialize(auto_promote); + for i in ClientId::iter() { + // Clear per-client delay, because we're starting a new session + self.clients[i].set_delay_us(0); + } + // Special-case: if a Downstairs goes away midway through initial // reconciliation, then we have to manually abort reconciliation. if self.clients.iter().any(|c| c.state() == DsState::Reconcile) { @@ -2293,6 +2319,8 @@ impl Downstairs { self.clients[cid].enqueue(&mut io, last_repair_extent); if matches!(job_state, IOState::Skipped) { skipped += 1; + } else { + assert_eq!(job_state, IOState::New); } } @@ -3523,6 +3551,109 @@ impl Downstairs { (gw, ds) } + + #[cfg(test)] + pub(crate) fn disable_client_backpressure(&mut self) { + self.backpressure_config.max_delay = Duration::ZERO; + } + + pub(crate) fn set_client_backpressure(&self) { + let mut jobs = vec![]; + let mut bytes = vec![]; + for c in self.clients.iter() { + if matches!(c.state(), DsState::Active) { + jobs.push(c.total_live_work()); + bytes.push(c.total_bytes_outstanding()); + } + } + // If none of the clients are active, then disable per-client + // backpressure + if jobs.is_empty() { + for c in self.clients.iter() { + c.set_delay_us(0); + } + return; + } + + // The "slowest" Downstairs will have the highest values for jobs and + // bytes, which we calculate here. Then, we'll apply delays to clients + // based on those values. + let max_jobs = jobs.into_iter().max().unwrap(); + let max_bytes = bytes.into_iter().max().unwrap(); + + let mut delays = ClientData::new(None); + for i in ClientId::iter() { + let c = &self.clients[i]; + if matches!(c.state(), DsState::Active) { + // These values represent how much **faster** we are than the + // slowest downstairs, and hence cause us to exert backpressure + // on this client (to slow it down). + let job_gap = (max_jobs - c.total_live_work()) as u64; + let bytes_gap = + (max_bytes - c.total_bytes_outstanding()) as u64; + + let job_delay_us = (job_gap + .saturating_sub(self.backpressure_config.jobs_start) + as f64 + * self.backpressure_config.jobs_scale) + .powf(2.0); + + let bytes_delay_us = (bytes_gap + .saturating_sub(self.backpressure_config.bytes_start) + as f64 + * self.backpressure_config.bytes_scale) + .powf(2.0); + + let delay_us = (job_delay_us.max(bytes_delay_us) as u64) + .min(self.backpressure_config.max_delay.as_micros() as u64); + + delays[i] = Some(delay_us); + } + } + // Cancel out any common delay, because it wouldn't be useful. + // + // (Seeing common delay would be unusual, because it would indicate that + // one Downstairs is ahead by the bytes-based metric and another is + // ahead by the jobs-based metric) + let min_delay = *delays.iter().flatten().min().unwrap(); + delays.iter_mut().flatten().for_each(|c| *c -= min_delay); + + // Apply delay to clients + for i in ClientId::iter() { + if let Some(delay) = delays[i] { + self.clients[i].set_delay_us(delay); + } else { + self.clients[i].set_delay_us(0); + } + } + } +} + +/// Configuration for per-client backpressure +/// +/// Per-client backpressure adds an artificial delay to the client queues, to +/// keep the three clients relatively in sync. The delay is varied based on two +/// metrics: +/// +/// - number of write bytes outstanding +/// - queue length +/// +/// These metrics are _relative_ to the slowest downstairs; the goal is to slow +/// down the faster Downstairs to keep the gap bounded. +#[derive(Copy, Clone, Debug)] +struct DownstairsBackpressureConfig { + /// When should backpressure start (in bytes)? + bytes_start: u64, + /// Scale for byte-based quadratic backpressure + bytes_scale: f64, + + /// When should job-count-based backpressure start? + jobs_start: u64, + /// Scale for job-count-based quadratic backpressure + jobs_scale: f64, + + /// Maximum delay + max_delay: Duration, } #[cfg(test)] diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index 45540c9ce..1466a081a 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -918,6 +918,11 @@ impl GuestIoHandle { self.backpressure_config.queue_max_delay = Duration::ZERO; } + #[cfg(test)] + pub fn is_queue_backpressure_disabled(&self) -> bool { + self.backpressure_config.queue_max_delay == Duration::ZERO + } + /// Set `self.backpressure_us` based on outstanding IO ratio pub fn set_backpressure(&self, bytes: u64, ratio: f64) { // Check to see if the number of outstanding write bytes (between diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 9dcf206eb..ed9a8de88 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -1331,6 +1331,23 @@ impl IOop { false } } + + /// Returns the number of bytes written or read in this job + fn job_bytes(&self) -> u64 { + match &self { + IOop::Write { data, .. } | IOop::WriteUnwritten { data, .. } => { + data.io_size_bytes as u64 + } + IOop::Read { requests, .. } => { + requests + .first() + .map(|r| r.offset.block_size_in_bytes()) + .unwrap_or(0) as u64 + * requests.len() as u64 + } + _ => 0, + } + } } /* @@ -2024,6 +2041,8 @@ pub struct Arg { pub ds_extents_repaired: [usize; 3], /// Times we have live confirmed an extent on this downstairs. pub ds_extents_confirmed: [usize; 3], + /// Per-client delay to keep them roughly in sync + pub ds_delay_us: [usize; 3], /// Times we skipped repairing a downstairs because we are read_only. pub ds_ro_lr_skipped: [usize; 3], } @@ -2071,6 +2090,9 @@ pub fn up_main( None }; + #[cfg(test)] + let disable_backpressure = guest.is_queue_backpressure_disabled(); + /* * Build the Upstairs struct that we use to share data between * the different async tasks @@ -2078,6 +2100,11 @@ pub fn up_main( let mut up = upstairs::Upstairs::new(&opt, gen, region_def, guest, tls_context); + #[cfg(test)] + if disable_backpressure { + up.disable_client_backpressure(); + } + if let Some(pr) = producer_registry { let ups = up.stats.clone(); if let Err(e) = pr.register_producer(ups) { diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 6e76464b0..10eb8855f 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -397,6 +397,11 @@ impl Upstairs { } } + #[cfg(test)] + pub(crate) fn disable_client_backpressure(&mut self) { + self.downstairs.disable_client_backpressure(); + } + /// Build an Upstairs for simple tests #[cfg(test)] pub fn test_default(ddef: Option) -> Self { @@ -738,6 +743,8 @@ impl Upstairs { self.downstairs.collect_stats(|c| c.stats.extents_repaired); let ds_extents_confirmed = self.downstairs.collect_stats(|c| c.stats.extents_confirmed); + let ds_delay_us = + self.downstairs.collect_stats(|c| c.get_delay_us() as usize); let ds_ro_lr_skipped = self.downstairs.collect_stats(|c| c.stats.ro_lr_skipped); @@ -762,6 +769,7 @@ impl Upstairs { ds_flow_control, ds_extents_repaired, ds_extents_confirmed, + ds_delay_us, ds_ro_lr_skipped, }; ("stats", arg) @@ -2016,6 +2024,7 @@ impl Upstairs { .reinitialize(client_id, auto_promote, &self.state); } + /// Sets both guest and per-client backpressure fn set_backpressure(&self) { let dsw_max = self .downstairs @@ -2027,6 +2036,8 @@ impl Upstairs { let ratio = dsw_max as f64 / crate::IO_OUTSTANDING_MAX as f64; self.guest .set_backpressure(self.downstairs.write_bytes_outstanding(), ratio); + + self.downstairs.set_client_backpressure(); } /// Returns the `RegionDefinition`