diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 15b36684c..b5dde9d61 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -547,7 +547,11 @@ impl DownstairsClient { /// /// # Panics /// If `self.client_task` is not `None`, or `self.target_addr` is `None` - pub(crate) fn reinitialize(&mut self, up_state: &UpstairsState) { + pub(crate) fn reinitialize( + &mut self, + up_state: &UpstairsState, + can_replay: bool, + ) { // Clear this Downstair's repair address, and let the YesItsMe set it. // This works if this Downstairs is new, reconnecting, or was replaced // entirely; the repair address could have changed in any of these @@ -570,6 +574,9 @@ impl DownstairsClient { let current = &self.state; let new_state = match current { + DsState::Active | DsState::Offline if !can_replay => { + Some(DsState::Faulted) + } DsState::Active => Some(DsState::Offline), DsState::LiveRepair | DsState::LiveRepairReady => { Some(DsState::Faulted) @@ -840,8 +847,12 @@ impl DownstairsClient { reason: ClientStopReason, ) { let new_state = match self.state { - DsState::Active => DsState::Offline, - DsState::Offline => DsState::Offline, + DsState::Active | DsState::Offline + if matches!(reason, ClientStopReason::IneligibleForReplay) => + { + DsState::Faulted + } + DsState::Active | DsState::Offline => DsState::Offline, DsState::Faulted => DsState::Faulted, DsState::Deactivated => DsState::New, DsState::Reconcile => DsState::New, @@ -2408,6 +2419,9 @@ pub(crate) enum ClientStopReason { /// The upstairs has requested that we deactivate when we were offline OfflineDeactivated, + + /// The Upstairs has dropped jobs that would be needed for replay + IneligibleForReplay, } /// Response received from the I/O task diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 23a889169..16b97774c 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -88,6 +88,19 @@ pub(crate) struct Downstairs { /// (including automatic flushes). next_flush: u64, + /// Indicates whether we are eligible for replay + /// + /// We are only eligible for replay if all jobs since the last flush are + /// buffered (i.e. none have been retired by a `Barrier` operation). + can_replay: bool, + + /// How many `Flush` or `Barrier` operations are pending? + /// + /// We only want to send a `Barrier` if there isn't already one pending, so + /// we track it here (incrementing in `submit_flush` / `submit_barrier` and + /// decrementing in `retire_check`). + pending_barrier: usize, + /// Ringbuf of recently acked job IDs. acked_ids: AllocRingBuffer, @@ -291,6 +304,8 @@ impl Downstairs { }, cfg, next_flush: 0, + can_replay: true, + pending_barrier: 0, ds_active: ActiveJobs::new(), gw_active: HashSet::new(), acked_ids: AllocRingBuffer::new(2048), @@ -523,7 +538,7 @@ impl Downstairs { // Restart the IO task for that specific client, transitioning to a new // state. - self.clients[client_id].reinitialize(up_state); + self.clients[client_id].reinitialize(up_state, self.can_replay); for i in ClientId::iter() { // Clear per-client delay, because we're starting a new session @@ -1887,6 +1902,7 @@ impl Downstairs { extent_limit: extent_under_repair, }; + self.pending_barrier += 1; self.enqueue( next_id, flush, @@ -1896,6 +1912,47 @@ impl Downstairs { next_id } + /// Checks to see whether a `Barrier` operation is needed + /// + /// A `Barrier` is needed if we have buffered more than + /// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no + /// other barrier (or flush) operations in flight + pub(crate) fn needs_barrier(&self) -> bool { + if self.pending_barrier > 0 { + return false; + } + + // n.b. This may not be 100% reliable: if different Downstairs have + // finished a different subset of jobs, then it's theoretically possible + // for each DownstairsClient to be under our limits, but for the true + // number of cached bytes/jobs to be over the limits. + // + // It's hard to imagine how we could encounter such a situation, given + // job dependencies and no out-of-order execution, so this is more of a + // "fun fact" and less an actual concern. + let max_jobs = self + .clients + .iter() + .map(|c| { + let i = c.io_state_job_count(); + i.skipped + i.done + i.error + }) + .max() + .unwrap(); + let max_bytes = self + .clients + .iter() + .map(|c| { + let i = c.io_state_byte_count(); + i.skipped + i.done + i.error + }) + .max() + .unwrap(); + + max_jobs as u64 >= crate::IO_CACHED_MAX_JOBS + || max_bytes >= crate::IO_CACHED_MAX_BYTES + } + pub(crate) fn submit_barrier(&mut self) -> JobId { let next_id = self.next_id(); cdt::gw__barrier__start!(|| (next_id.0)); @@ -1906,6 +1963,7 @@ impl Downstairs { let dependencies = self.ds_active.deps_for_flush(next_id); debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}"); + self.pending_barrier += 1; self.enqueue( next_id, IOop::Barrier { dependencies }, @@ -2439,11 +2497,17 @@ impl Downstairs { Ok(ReplaceResult::Started) } + /// Checks whether the given client state should go from Offline -> Faulted + /// + /// # Panics + /// If the given client is not in the `Offline` state pub(crate) fn check_gone_too_long( &mut self, client_id: ClientId, up_state: &UpstairsState, ) { + assert_eq!(self.clients[client_id].state(), DsState::Offline); + let byte_count = self.clients[client_id].total_bytes_outstanding(); let work_count = self.clients[client_id].total_live_work(); let failed = if work_count > crate::IO_OUTSTANDING_MAX_JOBS { @@ -2458,6 +2522,13 @@ impl Downstairs { "downstairs failed, too many outstanding bytes {byte_count}" ); Some(ClientStopReason::TooManyOutstandingBytes) + } else if !self.can_replay { + // XXX can this actually happen? + warn!( + self.log, + "downstairs became ineligible for replay while offline" + ); + Some(ClientStopReason::IneligibleForReplay) } else { None }; @@ -2589,9 +2660,12 @@ impl Downstairs { /// writes and if they aren't included in replay then the write will /// never start. fn retire_check(&mut self, ds_id: JobId) { - if !self.is_flush(ds_id) { - return; - } + let job = self.ds_active.get(&ds_id).expect("checked missing job"); + let can_replay = match job.work { + IOop::Flush { .. } => true, + IOop::Barrier { .. } => false, + _ => return, + }; // Only a completed flush will remove jobs from the active queue - // currently we have to keep everything around for use during replay @@ -2645,6 +2719,13 @@ impl Downstairs { for &id in &retired { let job = self.ds_active.remove(&id); + // Update our barrier count for the removed job + if matches!(job.work, IOop::Flush { .. } | IOop::Barrier { .. }) + { + self.pending_barrier = + self.pending_barrier.checked_sub(1).unwrap(); + } + // Jobs should have their backpressure contribution removed when // they are completed (in `process_io_completion_inner`), // **not** when they are retired. We'll do a sanity check here @@ -2666,6 +2747,9 @@ impl Downstairs { for cid in ClientId::iter() { self.clients[cid].skipped_jobs.retain(|&x| x >= ds_id); } + + // Update the flag indicating whether replay is allowed + self.can_replay = can_replay; } } @@ -4176,6 +4260,13 @@ impl Downstairs { self.ddef = Some(ddef); } + /// Checks whether there are any in-progress jobs present + pub(crate) fn has_live_jobs(&self) -> bool { + self.clients + .iter() + .any(|c| c.backpressure_counters.get_jobs() > 0) + } + /// Returns the per-client state for the given job /// /// This is a helper function to make unit tests shorter diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index e9e277a31..69bda4a19 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -14,7 +14,10 @@ use crate::BlockIO; use crate::Buffer; use crate::CrucibleError; use crate::DsState; -use crate::{IO_OUTSTANDING_MAX_BYTES, IO_OUTSTANDING_MAX_JOBS}; +use crate::{ + IO_CACHED_MAX_BYTES, IO_CACHED_MAX_JOBS, IO_OUTSTANDING_MAX_BYTES, + IO_OUTSTANDING_MAX_JOBS, +}; use crucible_client_types::CrucibleOpts; use crucible_common::Block; use crucible_common::BlockIndex; @@ -264,23 +267,26 @@ impl DownstairsHandle { /// # Panics /// If a non-flush message arrives pub async fn ack_flush(&mut self) -> u64 { - let Message::Flush { - job_id, - flush_number, - upstairs_id, - .. - } = self.recv().await.unwrap() - else { - panic!("saw non flush!"); - }; - self.send(Message::FlushAck { - upstairs_id, - session_id: self.upstairs_session_id.unwrap(), - job_id, - result: Ok(()), - }) - .unwrap(); - flush_number + match self.recv().await.unwrap() { + Message::Flush { + job_id, + flush_number, + upstairs_id, + .. + } => { + self.send(Message::FlushAck { + upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id, + result: Ok(()), + }) + .unwrap(); + flush_number + } + m => { + panic!("saw non flush {m:?}"); + } + } } /// Awaits a `Message::Write { .. }` and sends a `WriteAck` @@ -290,17 +296,45 @@ impl DownstairsHandle { /// # Panics /// If a non-write message arrives pub async fn ack_write(&mut self) -> JobId { - let Message::Write { header, .. } = self.recv().await.unwrap() else { - panic!("saw non write!"); - }; - self.send(Message::WriteAck { - upstairs_id: header.upstairs_id, - session_id: self.upstairs_session_id.unwrap(), - job_id: header.job_id, - result: Ok(()), - }) - .unwrap(); - header.job_id + match self.recv().await.unwrap() { + Message::Write { header, .. } => { + self.send(Message::WriteAck { + upstairs_id: header.upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id: header.job_id, + result: Ok(()), + }) + .unwrap(); + header.job_id + } + m => panic!("saw non write: {m:?}"), + } + } + + /// Awaits a `Message::Barrier { .. }` and sends a `BarrierAck` + /// + /// Returns the job ID for further checks. + /// + /// # Panics + /// If a non-write message arrives + pub async fn ack_barrier(&mut self) -> JobId { + match self.recv().await.unwrap() { + Message::Barrier { + upstairs_id, + job_id, + .. + } => { + self.send(Message::BarrierAck { + upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id, + result: Ok(()), + }) + .unwrap(); + job_id + } + m => panic!("saw non barrier: {m:?}"), + } } /// Awaits a `Message::Read` and sends a blank `ReadResponse` @@ -310,26 +344,27 @@ impl DownstairsHandle { /// # Panics /// If a non-read message arrives pub async fn ack_read(&mut self) -> JobId { - let Message::ReadRequest { - job_id, - upstairs_id, - .. - } = self.recv().await.unwrap() - else { - panic!("saw non write!"); - }; - let (block, data) = make_blank_read_response(); - self.send(Message::ReadResponse { - header: ReadResponseHeader { - upstairs_id, - session_id: self.upstairs_session_id.unwrap(), + match self.recv().await.unwrap() { + Message::ReadRequest { job_id, - blocks: Ok(vec![block]), - }, - data: data.clone(), - }) - .unwrap(); - job_id + upstairs_id, + .. + } => { + let (block, data) = make_blank_read_response(); + self.send(Message::ReadResponse { + header: ReadResponseHeader { + upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id, + blocks: Ok(vec![block]), + }, + data: data.clone(), + }) + .unwrap(); + job_id + } + m => panic!("saw non read {m:?}"), + } } } @@ -471,14 +506,31 @@ pub struct TestHarness { /// Number of extents in `TestHarness::default_config` const DEFAULT_EXTENT_COUNT: u32 = 25; +const DEFAULT_BLOCK_COUNT: u64 = 10; + +struct TestOpts { + flush_timeout: f32, + read_only: bool, + disable_backpressure: bool, +} impl TestHarness { pub async fn new() -> TestHarness { - Self::new_(false).await + Self::new_with_opts(TestOpts { + flush_timeout: 86400.0, + read_only: false, + disable_backpressure: true, + }) + .await } pub async fn new_ro() -> TestHarness { - Self::new_(true).await + Self::new_with_opts(TestOpts { + flush_timeout: 86400.0, + read_only: true, + disable_backpressure: true, + }) + .await } pub fn ds1(&mut self) -> &mut DownstairsHandle { @@ -494,7 +546,7 @@ impl TestHarness { // IO_OUTSTANDING_MAX_BYTES in less than IO_OUTSTANDING_MAX_JOBS, // i.e. letting us test both byte and job fault conditions. extent_count: DEFAULT_EXTENT_COUNT, - extent_size: Block::new_512(10), + extent_size: Block::new_512(DEFAULT_BLOCK_COUNT), gen_numbers: vec![0u64; DEFAULT_EXTENT_COUNT as usize], flush_numbers: vec![0u64; DEFAULT_EXTENT_COUNT as usize], @@ -502,10 +554,10 @@ impl TestHarness { } } - async fn new_(read_only: bool) -> TestHarness { + async fn new_with_opts(opts: TestOpts) -> TestHarness { let log = csl(); - let cfg = Self::default_config(read_only); + let cfg = Self::default_config(opts.read_only); let ds1 = cfg.clone().start(log.new(o!("downstairs" => 1))).await; let ds2 = cfg.clone().start(log.new(o!("downstairs" => 2))).await; @@ -514,15 +566,17 @@ impl TestHarness { // Configure our guest without backpressure, to speed up tests which // require triggering a timeout let (g, mut io) = Guest::new(Some(log.clone())); - io.disable_queue_backpressure(); - io.disable_byte_backpressure(); + if opts.disable_backpressure { + io.disable_queue_backpressure(); + io.disable_byte_backpressure(); + } let guest = Arc::new(g); let crucible_opts = CrucibleOpts { id: Uuid::new_v4(), target: vec![ds1.local_addr, ds2.local_addr, ds3.local_addr], - flush_timeout: Some(86400.0), - read_only, + flush_timeout: Some(opts.flush_timeout), + read_only: opts.read_only, ..Default::default() }; @@ -1446,11 +1500,12 @@ async fn test_byte_fault_condition() { // out. const WRITE_SIZE: usize = 105 * 1024; // 105 KiB let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB + let barrier_point = IO_CACHED_MAX_BYTES as usize / write_buf.len(); let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10; assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS); // First, we'll send jobs until the timeout - for _ in 0..num_jobs { + for i in 0..num_jobs { // We must `spawn` here because `write` will wait for the response // to come back before returning let write_buf = write_buf.clone(); @@ -1458,11 +1513,11 @@ async fn test_byte_fault_condition() { guest.write(BlockIndex(0), write_buf).await.unwrap(); }); - // Before we're kicked out, assert we're seeing the read requests - assert!(matches!( - harness.ds1().recv().await.unwrap(), - Message::Write { .. }, - )); + // Before we're kicked out, assert we're seeing the write requests + let m = harness.ds1().recv().await.unwrap(); + if !matches!(m, Message::Write { .. },) { + panic!("got unexpected message {m:?}"); + } harness.ds2.ack_write().await; harness.ds3.ack_write().await; @@ -1474,6 +1529,17 @@ async fn test_byte_fault_condition() { assert_eq!(ds[ClientId::new(0)], DsState::Active); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Once one of the Downstairs has cached more that a certain amount of + // bytes, it will automatically send a Barrier operation. + if i == barrier_point { + let m = harness.ds1().recv().await.unwrap(); + if !matches!(m, Message::Barrier { .. }) { + panic!("expected Barrier, got message {m:?}"); + } + harness.ds2.ack_barrier().await; + harness.ds3.ack_barrier().await; + } } // Sleep until we're confident that the Downstairs is kicked out @@ -1605,6 +1671,13 @@ async fn test_byte_fault_condition_offline() { assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); } + + // If we've sent IO_CACHED_MAX_BYTES, then we expect the Upstairs to + // insert a Barrier (because it's trying to clean out finished jobs). + if IO_CACHED_MAX_BYTES as usize / WRITE_SIZE == i { + harness.ds2.ack_barrier().await; + harness.ds3.ack_barrier().await; + } } // Confirm that the system comes up after live-repair @@ -1760,7 +1833,7 @@ async fn test_job_fault_condition() { // timeout, so that when timeout hits, the downstairs will become Faulted // instead of Offline. let num_jobs = IO_OUTSTANDING_MAX_JOBS + 200; - for _ in 0..num_jobs { + for i in 0..num_jobs { // We must `spawn` here because `write` will wait for the response to // come back before returning let h = harness.spawn(|guest| async move { @@ -1769,10 +1842,10 @@ async fn test_job_fault_condition() { }); // DS1 should be receiving messages - assert!(matches!( - harness.ds1().recv().await.unwrap(), - Message::ReadRequest { .. }, - )); + let m = harness.ds1().recv().await.unwrap(); + if !matches!(m, Message::ReadRequest { .. },) { + panic!("got unexpected message {m:?}"); + } // Respond with read responses for downstairs 2 and 3 harness.ds2.ack_read().await; @@ -1786,6 +1859,18 @@ async fn test_job_fault_condition() { assert_eq!(ds[ClientId::new(0)], DsState::Active); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // When we hit IO_CACHED_MAX_JOBS, the Upstairs will attempt to send a + // barrier to retire jobs. The barrier won't succeed, because we're not + // acking it on DS1. + if i + 1 == IO_CACHED_MAX_JOBS as usize { + let m = harness.ds1().recv().await.unwrap(); + if !matches!(m, Message::Barrier { .. }) { + panic!("expected Barrier, got message {m:?}"); + } + harness.ds2.ack_barrier().await; + harness.ds3.ack_barrier().await; + } } // Sleep until we're confident that the Downstairs is kicked out @@ -1888,6 +1973,7 @@ async fn test_job_fault_condition_offline() { // transition it to `Faulted` by sending it enough to hit // `IO_OUTSTANDING_MAX_JOBS` info!(harness.log, "sending more jobs to fault DS1"); + let mut barrier_count = 0; for i in num_jobs..IO_OUTSTANDING_MAX_JOBS + 200 { let h = harness.spawn(|guest| async move { let mut buffer = Buffer::new(1, 512); @@ -1910,8 +1996,17 @@ async fn test_job_fault_condition_offline() { // the Upstairs has finished updating its state). h.await.unwrap(); + // If we've sent IO_CACHED_MAX_JOBS, then we expect the Upstairs to + // insert a Barrier (because it's trying to clean out finished jobs). + if i + 1 == IO_CACHED_MAX_JOBS as usize { + // nothing arrives on DS1, because it's offline + harness.ds2.ack_barrier().await; + harness.ds3.ack_barrier().await; + barrier_count += 1; + } + let ds = harness.guest.downstairs_state().await.unwrap(); - if i < IO_OUTSTANDING_MAX_JOBS { + if i + barrier_count < IO_OUTSTANDING_MAX_JOBS { // At this point, we should still be offline assert_eq!(ds[ClientId::new(0)], DsState::Offline); assert_eq!(ds[ClientId::new(1)], DsState::Active); @@ -2716,3 +2811,83 @@ async fn test_no_send_offline() { e => panic!("invalid message {e:?}; expected Write"), } } + +/// Test that barrier operations are sent periodically +#[tokio::test] +async fn test_jobs_based_barrier() { + let mut harness = TestHarness::new_with_opts(TestOpts { + flush_timeout: 0.5, + read_only: false, + disable_backpressure: false, + }) + .await; + + for i in 1..IO_CACHED_MAX_JOBS * 3 { + // Send a write, which will succeed + let write_handle = harness.spawn(|guest| async move { + let mut data = BytesMut::new(); + data.resize(512, 1u8); + guest.write(BlockIndex(0), data).await.unwrap(); + }); + + // Ensure that all three clients got the write request + harness.ds1().ack_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + + write_handle.await.unwrap(); + + if i % IO_CACHED_MAX_JOBS == 0 { + harness.ds1().ack_barrier().await; + harness.ds2.ack_barrier().await; + harness.ds3.ack_barrier().await; + } + } + + // We should also automatically send a flush here + harness.ds1().ack_flush().await; + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; +} + +/// Test that barrier operations are sent periodically +#[tokio::test] +async fn test_bytes_based_barrier() { + let mut harness = TestHarness::new_with_opts(TestOpts { + flush_timeout: 0.5, + read_only: false, + disable_backpressure: false, + }) + .await; + + const WRITE_SIZE: usize = 105 * 1024; // 105 KiB + let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB + let barrier_point = + (IO_CACHED_MAX_BYTES as usize).div_ceil(write_buf.len()); + + for i in 1..barrier_point * 3 + 10 { + // Send a write, which will succeed + let write_buf = write_buf.clone(); + let write_handle = harness.spawn(|guest| async move { + guest.write(BlockIndex(0), write_buf).await.unwrap(); + }); + + // Ensure that all three clients got the write request + harness.ds1().ack_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + + write_handle.await.unwrap(); + + if i % barrier_point == 0 { + harness.ds1().ack_barrier().await; + harness.ds2.ack_barrier().await; + harness.ds3.ack_barrier().await; + } + } + + // We should also automatically send a flush here + harness.ds1().ack_flush().await; + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; +} diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 02df2783d..bc511deb5 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -96,7 +96,19 @@ const IO_OUTSTANDING_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB /// /// If we exceed this value, the upstairs will give up and mark that offline /// downstairs as faulted. -pub const IO_OUTSTANDING_MAX_JOBS: usize = 10000; +const IO_OUTSTANDING_MAX_JOBS: usize = 10000; + +/// Maximum of bytes to cache from complete (but un-flushed) IO +/// +/// Caching complete jobs allows us to replay them if a Downstairs goes offline +/// them comes back. +const IO_CACHED_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB + +/// Maximum of jobs to cache from complete (but un-flushed) IO +/// +/// Caching complete jobs allows us to replay them if a Downstairs goes offline +/// them comes back. +const IO_CACHED_MAX_JOBS: u64 = 10000; /// The BlockIO trait behaves like a physical NVMe disk (or a virtio virtual /// disk): there is no contract about what order operations that are submitted @@ -1275,6 +1287,11 @@ impl IOop { // the downstairs to act based on that. true } + IOop::Barrier { .. } => { + // The Barrier IOop doesn't actually touch any extents; it's + // purely for dependency management. + true + } _ => { panic!("Unsupported IO check {:?}", self); } diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 884143e5d..a19a504fa 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -206,12 +206,11 @@ pub(crate) struct Upstairs { /// Marks whether a flush is needed /// - /// The Upstairs keeps all IOs in memory until a flush is ACK'd back from - /// all three downstairs. If there are IOs we have accepted into the work - /// queue that don't end with a flush, then we set this to indicate that the - /// upstairs may need to issue a flush of its own to be sure that data is - /// pushed to disk. Note that this is not an indication of an ACK'd flush, - /// just that the last IO command we put on the work queue was not a flush. + /// If there are IOs we have accepted into the work queue that don't end + /// with a flush, then we set this to indicate that the upstairs may need to + /// issue a flush of its own to be sure that data is pushed to disk. Note + /// that this is not an indication of an ACK'd flush, just that the last IO + /// command we put on the work queue was not a flush. need_flush: bool, /// Statistics for this upstairs @@ -531,6 +530,10 @@ impl Upstairs { /// Apply an action returned from [`Upstairs::select`] pub(crate) fn apply(&mut self, action: UpstairsAction) { + // Check whether the downstairs has live jobs before performing the + // action, because the action may cause it to retire live jobs. + let has_jobs = self.downstairs.has_live_jobs(); + match action { UpstairsAction::Downstairs(d) => { self.counters.action_downstairs += 1; @@ -603,6 +606,12 @@ impl Upstairs { // because too many jobs have piled up. self.gone_too_long(); + // Check whether we need to send a Barrier operation to clean out + // complete-but-unflushed jobs. + if self.downstairs.needs_barrier() { + self.submit_barrier() + } + // Check to see whether live-repair can continue // // This must be called before acking jobs, because it looks in @@ -653,6 +662,13 @@ impl Upstairs { // For now, check backpressure after every event. We may want to make // this more nuanced in the future. self.set_backpressure(); + + // We do this last because some of the code above can be slow + // (especially during debug builds), and we don't want to set our flush + // deadline such that it fires immediately. + if has_jobs { + self.flush_deadline = Instant::now() + self.flush_interval; + } } /// Helper function to await all deferred block requests @@ -1271,13 +1287,11 @@ impl Upstairs { cdt::up__to__ds__flush__start!(|| (ds_id.0)); } - #[allow(dead_code)] // XXX this will be used soon! fn submit_barrier(&mut self) { // Notice that unlike submit_read and submit_write, we do not check for // guest_io_ready here. The upstairs itself calls submit_barrier // without the guest being involved; indeed the guest is not allowed to // call it! - let ds_id = self.downstairs.submit_barrier(); cdt::up__to__ds__barrier__start!(|| (ds_id.0));