diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 492272396..95dcd3bf5 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -2,7 +2,7 @@ #![cfg_attr(usdt_need_asm, feature(asm))] #![cfg_attr(all(target_os = "macos", usdt_need_asm_sym), feature(asm_sym))] -use futures::lock::{Mutex, MutexGuard}; +use futures::lock::Mutex; use std::cmp::Ordering; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -338,7 +338,7 @@ pub async fn downstairs_import + std::fmt::Debug>( /* * Debug function to dump the work list. */ -pub async fn show_work(ds: &mut Downstairs) { +pub fn show_work(ds: &mut Downstairs) { let active_upstairs_connections = ds.active_upstairs(); println!( "Active Upstairs connections: {:?}", @@ -346,7 +346,7 @@ pub async fn show_work(ds: &mut Downstairs) { ); for upstairs_connection in active_upstairs_connections { - let work = ds.work_lock(upstairs_connection).await.unwrap(); + let work = ds.work(upstairs_connection).unwrap(); let mut kvec: Vec = work.active.keys().cloned().collect(); @@ -596,8 +596,8 @@ async fn proc_frame( writes, }; - let d = ad.lock().await; - d.add_work(upstairs_connection, job_id, new_write).await?; + let mut d = ad.lock().await; + d.add_work(upstairs_connection, job_id, new_write)?; Some(job_id) } Message::Flush { @@ -630,8 +630,8 @@ async fn proc_frame( extent_limit, }; - let d = ad.lock().await; - d.add_work(upstairs_connection, job_id, new_flush).await?; + let mut d = ad.lock().await; + d.add_work(upstairs_connection, job_id, new_flush)?; Some(job_id) } Message::WriteUnwritten { @@ -658,8 +658,8 @@ async fn proc_frame( writes, }; - let d = ad.lock().await; - d.add_work(upstairs_connection, job_id, new_write).await?; + let mut d = ad.lock().await; + d.add_work(upstairs_connection, job_id, new_write)?; Some(job_id) } Message::ReadRequest { @@ -686,8 +686,8 @@ async fn proc_frame( requests, }; - let d = ad.lock().await; - d.add_work(upstairs_connection, job_id, new_read).await?; + let mut d = ad.lock().await; + d.add_work(upstairs_connection, job_id, new_read)?; Some(job_id) } // These are for repair while taking live IO @@ -716,8 +716,8 @@ async fn proc_frame( extent: extent_id, }; - let d = ad.lock().await; - d.add_work(upstairs_connection, job_id, ext_close).await?; + let mut d = ad.lock().await; + d.add_work(upstairs_connection, job_id, ext_close)?; Some(job_id) } Message::ExtentLiveFlushClose { @@ -749,8 +749,8 @@ async fn proc_frame( gen_number, }; - let d = ad.lock().await; - d.add_work(upstairs_connection, job_id, new_flush).await?; + let mut d = ad.lock().await; + d.add_work(upstairs_connection, job_id, new_flush)?; Some(job_id) } Message::ExtentLiveRepair { @@ -782,9 +782,9 @@ async fn proc_frame( source_repair_address, }; - let d = ad.lock().await; + let mut d = ad.lock().await; debug!(d.log, "Received ExtentLiveRepair {}", job_id); - d.add_work(upstairs_connection, job_id, new_repair).await?; + d.add_work(upstairs_connection, job_id, new_repair)?; Some(job_id) } Message::ExtentLiveReopen { @@ -811,8 +811,8 @@ async fn proc_frame( extent: extent_id, }; - let d = ad.lock().await; - d.add_work(upstairs_connection, job_id, new_open).await?; + let mut d = ad.lock().await; + d.add_work(upstairs_connection, job_id, new_open)?; Some(job_id) } Message::ExtentLiveNoOp { @@ -834,9 +834,9 @@ async fn proc_frame( cdt::submit__el__noop__start!(|| job_id.0); let new_open = IOop::ExtentLiveNoOp { dependencies }; - let d = ad.lock().await; + let mut d = ad.lock().await; debug!(d.log, "Received NoOP {}", job_id); - d.add_work(upstairs_connection, job_id, new_open).await?; + d.add_work(upstairs_connection, job_id, new_open)?; Some(job_id) } @@ -996,8 +996,7 @@ async fn do_work_task( * are New or DepWait. */ let mut new_work: VecDeque = { - if let Ok(new_work) = - ads.lock().await.new_work(upstairs_connection).await + if let Ok(new_work) = ads.lock().await.new_work(upstairs_connection) { new_work.into_iter().collect() } else { @@ -1027,11 +1026,8 @@ async fn do_work_task( * in_progress method will only return a job if all * dependencies are met. */ - let job_id = ads - .lock() - .await - .in_progress(upstairs_connection, new_id) - .await?; + let job_id = + ads.lock().await.in_progress(upstairs_connection, new_id)?; // If the job's dependencies aren't met, then keep going let Some(job_id) = job_id else { @@ -1094,10 +1090,11 @@ async fn do_work_task( let is_flush = matches!(m, Message::FlushAck { .. }); resp_tx.send(m).await?; - ads.lock() - .await - .complete_work_inner(upstairs_connection, job_id, is_flush) - .await?; + ads.lock().await.complete_work_inner( + upstairs_connection, + job_id, + is_flush, + )?; cdt::work__done!(|| job_id.0); } @@ -1274,7 +1271,7 @@ where // If our upstairs never completed activation, // or some other upstairs activated, we won't // be able to report how many jobs. - match ds.jobs(upstairs_connection).await { + match ds.jobs(upstairs_connection){ Ok(jobs) => { info!( log, @@ -1297,7 +1294,7 @@ where log, "upstairs {:?} was previously \ active, clearing", upstairs_connection); - ds.clear_active(upstairs_connection).await?; + ds.clear_active(upstairs_connection)?; } } else { info!(log, "unknown upstairs disconnected"); @@ -1506,10 +1503,10 @@ where negotiated = NegotiationState::Ready; { - let ds = ads.lock().await; - let mut work = ds.work_lock( + let mut ds = ads.lock().await; + let work = ds.work_mut( upstairs_connection.unwrap(), - ).await?; + )?; work.last_flush = last_flush_number; info!( log, @@ -1818,13 +1815,13 @@ where log, "upstairs {:?} disconnected, {} jobs left", upstairs_connection, - ds.jobs(upstairs_connection).await?, + ds.jobs(upstairs_connection)?, ); if ds.is_active(upstairs_connection) { warn!(log, "upstairs {:?} was previously \ active, clearing", upstairs_connection); - ds.clear_active(upstairs_connection).await?; + ds.clear_active(upstairs_connection)?; } return Ok(()); @@ -1853,7 +1850,7 @@ where #[derive(Debug)] pub struct ActiveUpstairs { pub upstairs_connection: UpstairsConnection, - pub work: Mutex, + pub work: Work, pub terminate_sender: oneshot::Sender, } @@ -1911,49 +1908,43 @@ impl Downstairs { } } - /* - * Only grab the lock if the UpstairsConnection matches. - * - * Multiple Upstairs connecting to this Downstairs will spawn multiple - * threads that all can potentially add work to the same `active` hash - * map. Only one Upstairs can be "active" at any one time though. - * When promote_to_active takes the work lock, it will clear out the - * `active` hash map and (if applicable) will signal to the currently - * active Upstairs to terminate the connection. - * - * `new_work` and `add_work` both grab their work lock through this - * function. Let's say `promote_to_active` and `add_work` are racing for - * the work lock. If `add_work` wins the race it will put work into - * `active`, then `promote_to_active` will clear it out. If - * `promote_to_active` wins the race, it will change the Downstairs' - * active UpstairsConnection, and send the terminate signal to the - * tasks that are communicating to the previously active Upstairs - * (along with terminating the Downstairs tasks). If `add_work` for - * the previous Upstairs then does fire, it will fail to - * grab the lock because the UpstairsConnection is no longer active, and - * that `add_work` thread should close. - * - * Let's say `new_work` and `promote_to_active` are racing. If `new_work` - * wins, then it will return and run those jobs in `do_work_task`. - * However, `promote_to_active` will grab the lock and change the - * active UpstairsConnection, causing `do_work` to return - * UpstairsInactive for the jobs that were just returned. If - * `promote_to_active` wins, it will clear out the jobs of the old - * Upstairs. - * - * Grabbing the lock in this way should properly clear out the previously - * active Upstairs without causing jobs to be incorrectly sent to the - * newly active Upstairs. - */ - async fn work_lock( + /// Mutably borrow a connection's `Work` if the `UpstairsConnection` matches + /// + /// Because this function takes a `&mut self` and returns a `&mut Work` + /// (extending the lifetime of the initial borrow), it is impossible for + /// anyone else to interfere with the work map for the lifetime of the + /// borrow. + fn work_mut( + &mut self, + upstairs_connection: UpstairsConnection, + ) -> Result<&mut Work> { + self.check_upstairs_active(upstairs_connection)?; + let active_upstairs = self + .active_upstairs + .get_mut(&upstairs_connection.upstairs_id) + .unwrap(); + Ok(&mut active_upstairs.work) + } + + /// Borrow a connection's `Work` if the `UpstairsConnection` matches + fn work(&self, upstairs_connection: UpstairsConnection) -> Result<&Work> { + self.check_upstairs_active(upstairs_connection)?; + let active_upstairs = self + .active_upstairs + .get(&upstairs_connection.upstairs_id) + .unwrap(); + Ok(&active_upstairs.work) + } + + fn check_upstairs_active( &self, upstairs_connection: UpstairsConnection, - ) -> Result> { + ) -> Result<()> { let upstairs_uuid = upstairs_connection.upstairs_id; if !self.active_upstairs.contains_key(&upstairs_uuid) { warn!( self.log, - "{:?} cannot grab work lock, {} is not active!", + "{:?} cannot get active upstairs, {} is not active!", upstairs_connection, upstairs_uuid, ); @@ -1966,36 +1957,32 @@ impl Downstairs { if active_upstairs.upstairs_connection != upstairs_connection { warn!( self.log, - "{:?} cannot grab lock, does not match {:?}!", + "{:?} cannot get active upstairs, does not match {:?}!", upstairs_connection, active_upstairs.upstairs_connection, ); bail!(CrucibleError::UpstairsInactive) } - - Ok(active_upstairs.work.lock().await) + Ok(()) } - async fn jobs( - &self, - upstairs_connection: UpstairsConnection, - ) -> Result { - let work = self.work_lock(upstairs_connection).await?; + fn jobs(&self, upstairs_connection: UpstairsConnection) -> Result { + let work = self.work(upstairs_connection)?; Ok(work.jobs()) } - async fn new_work( + fn new_work( &self, upstairs_connection: UpstairsConnection, ) -> Result> { - let work = self.work_lock(upstairs_connection).await?; + let work = self.work(upstairs_connection)?; Ok(work.new_work(upstairs_connection)) } // Add work to the Downstairs - async fn add_work( - &self, + fn add_work( + &mut self, upstairs_connection: UpstairsConnection, ds_id: JobId, work: IOop, @@ -2027,31 +2014,31 @@ impl Downstairs { state: WorkState::New, }; - let mut work = self.work_lock(upstairs_connection).await?; + let work = self.work_mut(upstairs_connection)?; work.add_work(ds_id, dsw); Ok(()) } #[cfg(test)] - async fn get_job( + fn get_job( &self, upstairs_connection: UpstairsConnection, ds_id: JobId, ) -> Result { - let mut work = self.work_lock(upstairs_connection).await?; + let work = self.work(upstairs_connection)?; Ok(work.get_job(ds_id)) } // Downstairs, move a job to in_progress, if we can - async fn in_progress( - &self, + fn in_progress( + &mut self, upstairs_connection: UpstairsConnection, ds_id: JobId, ) -> Result> { let job = { let log = self.log.new(o!("role" => "work".to_string())); - let mut work = self.work_lock(upstairs_connection).await?; + let work = self.work_mut(upstairs_connection)?; work.in_progress(ds_id, log) }; @@ -2082,7 +2069,7 @@ impl Downstairs { job_id: JobId, ) -> Result> { let job = { - let mut work = self.work_lock(upstairs_connection).await?; + let work = self.work_mut(upstairs_connection)?; let job = work.get_ready_job(job_id); // `promote_to_active` can clear out the Work struct for this @@ -2379,15 +2366,14 @@ impl Downstairs { /// Helper function to call `complete_work` if the `Message` is available #[cfg(test)] - async fn complete_work( - &self, + fn complete_work( + &mut self, upstairs_connection: UpstairsConnection, ds_id: JobId, m: Message, ) -> Result<()> { let is_flush = matches!(m, Message::FlushAck { .. }); self.complete_work_inner(upstairs_connection, ds_id, is_flush) - .await } /* @@ -2397,15 +2383,15 @@ impl Downstairs { * - removing the response * - putting the id on the completed list. */ - async fn complete_work_inner( - &self, + fn complete_work_inner( + &mut self, upstairs_connection: UpstairsConnection, ds_id: JobId, is_flush: bool, ) -> Result<()> { - let mut work = self.work_lock(upstairs_connection).await?; + let work = self.work_mut(upstairs_connection)?; - // If upstairs_connection grabs the work lock, it is the active + // If upstairs_connection borrows the work map, it is the active // connection for this Upstairs UUID. The job should exist in the Work // struct. If it does not, then we're in the case where the same // Upstairs has reconnected and been promoted to active, meaning @@ -2482,14 +2468,14 @@ impl Downstairs { if self.read_only { // Multiple active read-only sessions are allowed, but multiple // sessions for the same Upstairs UUID are not. Kick out a - // previously active session for this UUID if one exists. Do this - // while holding the work lock so the previously active Upstairs + // previously active session for this UUID if one exists. This + // function is called on a `&mut self`, so we're guaranteed that the // isn't adding more work. if let Some(active_upstairs) = self .active_upstairs .remove(&upstairs_connection.upstairs_id) { - let mut work = active_upstairs.work.lock().await; + let work = &active_upstairs.work; info!( self.log, @@ -2539,7 +2525,6 @@ impl Downstairs { // working on outstanding jobs, or a way to merge. But for now, // we just throw out what we have and let the upstairs resend // anything to us that it did not get an ACK for. - work.clear(); } else { // There is no current session for this Upstairs UUID. } @@ -2550,7 +2535,7 @@ impl Downstairs { upstairs_connection.upstairs_id, ActiveUpstairs { upstairs_connection, - work: Mutex::new(Work::new()), + work: Work::new(), terminate_sender: tx, }, ); @@ -2569,7 +2554,7 @@ impl Downstairs { upstairs_connection.upstairs_id, ActiveUpstairs { upstairs_connection, - work: Mutex::new(Work::new()), + work: Work::new(), terminate_sender: tx, }, ); @@ -2647,7 +2632,7 @@ impl Downstairs { .remove(¤tly_active_upstairs_uuids[0]) .unwrap(); - let mut work = active_upstairs.work.lock().await; + let work = &active_upstairs.work; warn!( self.log, @@ -2700,7 +2685,6 @@ impl Downstairs { // But for now, we just throw out what we have and let the // upstairs resend anything to us that it did not get an ACK // for. - work.clear(); // Insert or replace the session @@ -2708,7 +2692,7 @@ impl Downstairs { upstairs_connection.upstairs_id, ActiveUpstairs { upstairs_connection, - work: Mutex::new(Work::new()), + work: Work::new(), terminate_sender: tx, }, ); @@ -2754,13 +2738,12 @@ impl Downstairs { .collect() } - async fn clear_active( + fn clear_active( &mut self, upstairs_connection: UpstairsConnection, ) -> Result<()> { - let mut work = self.work_lock(upstairs_connection).await?; + let work = self.work_mut(upstairs_connection)?; work.clear(); - drop(work); self.active_upstairs .remove(&upstairs_connection.upstairs_id); @@ -2843,7 +2826,7 @@ impl Work { } #[cfg(test)] - fn get_job(&mut self, ds_id: JobId) -> DownstairsWork { + fn get_job(&self, ds_id: JobId) -> DownstairsWork { self.active.get(&ds_id).unwrap().clone() } @@ -2872,9 +2855,9 @@ impl Work { if let Some(job) = self.active.get_mut(&ds_id) { if job.state == WorkState::New || job.state == WorkState::DepWait { /* - * Before we can make this in_progress, we have to, while - * holding this locked, check the dep list if there is one - * and make sure all dependencies are completed. + * Before we can make this in_progress, we have to check the dep + * list if there is one and make sure all dependencies are + * completed. */ let dep_list = job.work.deps(); @@ -3643,7 +3626,7 @@ mod test { offset: Block::new_512(1), }], }; - ds.add_work(upstairs_connection, JobId(1000), rio).await?; + ds.add_work(upstairs_connection, JobId(1000), rio)?; let deps = vec![JobId(1000)]; let rio = IOop::Read { @@ -3653,25 +3636,24 @@ mod test { offset: Block::new_512(1), }], }; - ds.add_work(upstairs_connection, JobId(1001), rio).await?; + ds.add_work(upstairs_connection, JobId(1001), rio)?; - show_work(&mut ds).await; + show_work(&mut ds); // Now we mimic what happens in the do_work_task() - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); println!("Got new work: {:?}", new_work); assert_eq!(new_work.len(), 2); for id in new_work.iter() { - let ip_id = - ds.in_progress(upstairs_connection, *id).await?.unwrap(); + let ip_id = ds.in_progress(upstairs_connection, *id)?.unwrap(); assert_eq!(ip_id, *id); println!("Do IOop {}", *id); let m = ds.do_work(upstairs_connection, *id).await?.unwrap(); println!("Got m: {:?}", m); - ds.complete_work(upstairs_connection, *id, m).await?; + ds.complete_work(upstairs_connection, *id, m)?; } - show_work(&mut ds).await; + show_work(&mut ds); Ok(()) } @@ -3746,7 +3728,7 @@ mod test { dependencies: Vec::new(), extent: 0, }; - ds.add_work(upstairs_connection, JobId(1000), rio).await?; + ds.add_work(upstairs_connection, JobId(1000), rio)?; let rio = IOop::ExtentFlushClose { dependencies: vec![], @@ -3754,7 +3736,7 @@ mod test { flush_number: 1, gen_number: 2, }; - ds.add_work(upstairs_connection, JobId(1001), rio).await?; + ds.add_work(upstairs_connection, JobId(1001), rio)?; let deps = vec![JobId(1000), JobId(1001)]; let rio = IOop::Read { @@ -3764,38 +3746,37 @@ mod test { offset: Block::new_512(1), }], }; - ds.add_work(upstairs_connection, JobId(1002), rio).await?; + ds.add_work(upstairs_connection, JobId(1002), rio)?; let deps = vec![JobId(1000), JobId(1001), JobId(1002)]; let rio = IOop::ExtentLiveNoOp { dependencies: deps }; - ds.add_work(upstairs_connection, JobId(1003), rio).await?; + ds.add_work(upstairs_connection, JobId(1003), rio)?; let deps = vec![JobId(1000), JobId(1001), JobId(1002), JobId(1003)]; let rio = IOop::ExtentLiveReopen { dependencies: deps, extent: 0, }; - ds.add_work(upstairs_connection, JobId(1004), rio).await?; + ds.add_work(upstairs_connection, JobId(1004), rio)?; println!("Before doing work we have:"); - show_work(&mut ds).await; + show_work(&mut ds); // Now we mimic what happens in the do_work_task() - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); println!("Got new work: {:?}", new_work); assert_eq!(new_work.len(), 5); for id in new_work.iter() { - let ip_id = - ds.in_progress(upstairs_connection, *id).await?.unwrap(); + let ip_id = ds.in_progress(upstairs_connection, *id)?.unwrap(); assert_eq!(ip_id, *id); println!("Do IOop {}", *id); let m = ds.do_work(upstairs_connection, *id).await?.unwrap(); println!("Got m: {:?}", m); - ds.complete_work(upstairs_connection, *id, m).await?; + ds.complete_work(upstairs_connection, *id, m)?; } - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 0); Ok(()) } @@ -3832,7 +3813,7 @@ mod test { dependencies: Vec::new(), extent: 0, }; - ds.add_work(upstairs_connection, JobId(1000), rio).await?; + ds.add_work(upstairs_connection, JobId(1000), rio)?; let rio = IOop::ExtentFlushClose { dependencies: vec![], @@ -3840,29 +3821,27 @@ mod test { flush_number: 1, gen_number: gen, }; - ds.add_work(upstairs_connection, JobId(1001), rio).await?; + ds.add_work(upstairs_connection, JobId(1001), rio)?; // Add the two reopen commands for the two extents we closed. let rio = IOop::ExtentLiveReopen { dependencies: vec![JobId(1000)], extent: 0, }; - ds.add_work(upstairs_connection, JobId(1002), rio).await?; + ds.add_work(upstairs_connection, JobId(1002), rio)?; let rio = IOop::ExtentLiveReopen { dependencies: vec![JobId(1001)], extent: 1, }; - ds.add_work(upstairs_connection, JobId(1003), rio).await?; - show_work(&mut ds).await; + ds.add_work(upstairs_connection, JobId(1003), rio)?; + show_work(&mut ds); - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); println!("Got new work: {:?}", new_work); assert_eq!(new_work.len(), 4); // Process the ExtentClose - ds.in_progress(upstairs_connection, JobId(1000)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1000))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1000)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. In this case @@ -3888,13 +3867,10 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1000), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1000), m)?; // Process the ExtentFlushClose - ds.in_progress(upstairs_connection, JobId(1001)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1001))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1001)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. In this case @@ -3920,12 +3896,11 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1001), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1001), m)?; // Process the two ExtentReopen commands for id in (1002..=1003).map(JobId) { - ds.in_progress(upstairs_connection, id).await?.unwrap(); + ds.in_progress(upstairs_connection, id)?.unwrap(); let m = ds.do_work(upstairs_connection, id).await?.unwrap(); match m { Message::ExtentLiveAckId { @@ -3943,11 +3918,11 @@ mod test { panic!("Incorrect message: {:?} for id: {}", m, id); } } - ds.complete_work(upstairs_connection, id, m).await?; + ds.complete_work(upstairs_connection, id, m)?; } // Nothing should be left on the queue. - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 0); Ok(()) } @@ -4011,7 +3986,7 @@ mod test { dependencies: Vec::new(), writes, }; - ds.add_work(upstairs_connection, JobId(1000), rio).await?; + ds.add_work(upstairs_connection, JobId(1000), rio)?; // add work for flush 1001 let rio = IOop::Flush { @@ -4021,7 +3996,7 @@ mod test { snapshot_details: None, extent_limit: None, }; - ds.add_work(upstairs_connection, JobId(1001), rio).await?; + ds.add_work(upstairs_connection, JobId(1001), rio)?; // Add work for 2nd write 1002 let writes = create_generic_test_write(eid); @@ -4030,52 +4005,41 @@ mod test { dependencies: vec![JobId(1000), JobId(1001)], writes, }; - ds.add_work(upstairs_connection, JobId(1002), rio).await?; + ds.add_work(upstairs_connection, JobId(1002), rio)?; // Now close the extent let rio = IOop::ExtentClose { dependencies: vec![JobId(1000), JobId(1001), JobId(1002)], extent: eid as usize, }; - ds.add_work(upstairs_connection, JobId(1003), rio).await?; + ds.add_work(upstairs_connection, JobId(1003), rio)?; - show_work(&mut ds).await; + show_work(&mut ds); - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); println!("Got new work: {:?}", new_work); assert_eq!(new_work.len(), 4); // Process the first Write - ds.in_progress(upstairs_connection, JobId(1000)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1000))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1000)).await?.unwrap(); - ds.complete_work(upstairs_connection, JobId(1000), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1000), m)?; // Process the flush - ds.in_progress(upstairs_connection, JobId(1001)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1001))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1001)).await?.unwrap(); - ds.complete_work(upstairs_connection, JobId(1001), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1001), m)?; // Process write 2 - ds.in_progress(upstairs_connection, JobId(1002)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1002))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1002)).await?.unwrap(); - ds.complete_work(upstairs_connection, JobId(1002), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1002), m)?; - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 1); // Process the ExtentClose - ds.in_progress(upstairs_connection, JobId(1003)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1003))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1003)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. In this case @@ -4101,11 +4065,10 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1003), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1003), m)?; // Nothing should be left on the queue. - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 0); Ok(()) } @@ -4143,24 +4106,22 @@ mod test { dependencies: Vec::new(), writes, }; - ds.add_work(upstairs_connection, JobId(1000), rio).await?; + ds.add_work(upstairs_connection, JobId(1000), rio)?; let rio = IOop::ExtentClose { dependencies: vec![JobId(1000)], extent: eid as usize, }; - ds.add_work(upstairs_connection, JobId(1001), rio).await?; + ds.add_work(upstairs_connection, JobId(1001), rio)?; - show_work(&mut ds).await; + show_work(&mut ds); - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); println!("Got new work: {:?}", new_work); assert_eq!(new_work.len(), 2); // Process the Write - ds.in_progress(upstairs_connection, JobId(1000)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1000))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1000)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. @@ -4180,16 +4141,13 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1000), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1000), m)?; - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 1); // Process the ExtentClose - ds.in_progress(upstairs_connection, JobId(1001)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1001))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1001)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. In this case @@ -4215,11 +4173,10 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1001), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1001), m)?; // Nothing should be left on the queue. - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 0); Ok(()) } @@ -4259,7 +4216,7 @@ mod test { dependencies: Vec::new(), writes, }; - ds.add_work(upstairs_connection, JobId(1000), rio).await?; + ds.add_work(upstairs_connection, JobId(1000), rio)?; let rio = IOop::ExtentFlushClose { dependencies: vec![JobId(1000)], @@ -4267,18 +4224,16 @@ mod test { flush_number: 3, gen_number: gen, }; - ds.add_work(upstairs_connection, JobId(1001), rio).await?; + ds.add_work(upstairs_connection, JobId(1001), rio)?; - show_work(&mut ds).await; + show_work(&mut ds); - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); println!("Got new work: {:?}", new_work); assert_eq!(new_work.len(), 2); // Process the Write - ds.in_progress(upstairs_connection, JobId(1000)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1000))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1000)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. @@ -4298,16 +4253,13 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1000), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1000), m)?; - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 1); // Process the ExtentFlushClose - ds.in_progress(upstairs_connection, JobId(1001)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1001))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1001)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. In this case @@ -4333,11 +4285,10 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1001), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1001), m)?; // Nothing should be left on the queue. - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 0); Ok(()) } @@ -4376,7 +4327,7 @@ mod test { dependencies: Vec::new(), writes, }; - ds.add_work(upstairs_connection, JobId(1000), rio).await?; + ds.add_work(upstairs_connection, JobId(1000), rio)?; // Create the write for extent 2 let writes = create_generic_test_write(eid_two); @@ -4384,7 +4335,7 @@ mod test { dependencies: Vec::new(), writes, }; - ds.add_work(upstairs_connection, JobId(1001), rio).await?; + ds.add_work(upstairs_connection, JobId(1001), rio)?; // Flush and close extent 1 let rio = IOop::ExtentFlushClose { @@ -4393,32 +4344,30 @@ mod test { flush_number: 6, gen_number: gen, }; - ds.add_work(upstairs_connection, JobId(1002), rio).await?; + ds.add_work(upstairs_connection, JobId(1002), rio)?; // Just close extent 2 let rio = IOop::ExtentClose { dependencies: vec![JobId(1001)], extent: eid_two as usize, }; - ds.add_work(upstairs_connection, JobId(1003), rio).await?; + ds.add_work(upstairs_connection, JobId(1003), rio)?; - show_work(&mut ds).await; + show_work(&mut ds); - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); println!("Got new work: {:?}", new_work); assert_eq!(new_work.len(), 4); // Process the Writes for id in (1000..=1001).map(JobId) { - ds.in_progress(upstairs_connection, id).await?.unwrap(); + ds.in_progress(upstairs_connection, id)?.unwrap(); let m = ds.do_work(upstairs_connection, id).await?.unwrap(); - ds.complete_work(upstairs_connection, id, m).await?; + ds.complete_work(upstairs_connection, id, m)?; } // Process the ExtentFlushClose - ds.in_progress(upstairs_connection, JobId(1002)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1002))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1002)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. In this case @@ -4444,13 +4393,10 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1002), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1002), m)?; // Process the ExtentClose - ds.in_progress(upstairs_connection, JobId(1003)) - .await? - .unwrap(); + ds.in_progress(upstairs_connection, JobId(1003))?.unwrap(); let m = ds.do_work(upstairs_connection, JobId(1003)).await?.unwrap(); // Verify that we not only have composed the correct ACK, but the // result inside that ACK is also what we expect. In this case @@ -4476,10 +4422,9 @@ mod test { panic!("Incorrect message: {:?}", m); } } - ds.complete_work(upstairs_connection, JobId(1003), m) - .await?; + ds.complete_work(upstairs_connection, JobId(1003), m)?; // Nothing should be left on the queue. - let new_work = ds.new_work(upstairs_connection).await.unwrap(); + let new_work = ds.new_work(upstairs_connection).unwrap(); assert_eq!(new_work.len(), 0); Ok(()) } @@ -5939,8 +5884,7 @@ mod test { offset: Block::new_512(1), }], }; - ds.add_work(upstairs_connection_1, JobId(1000), read_1.clone()) - .await?; + ds.add_work(upstairs_connection_1, JobId(1000), read_1.clone())?; let read_2 = IOop::Read { dependencies: Vec::new(), @@ -5949,16 +5893,15 @@ mod test { offset: Block::new_512(2), }], }; - ds.add_work(upstairs_connection_2, JobId(1000), read_2.clone()) - .await?; + ds.add_work(upstairs_connection_2, JobId(1000), read_2.clone())?; - let work_1 = ds.new_work(upstairs_connection_1).await?; - let work_2 = ds.new_work(upstairs_connection_2).await?; + let work_1 = ds.new_work(upstairs_connection_1)?; + let work_2 = ds.new_work(upstairs_connection_2)?; assert_eq!(work_1, work_2); - let job_1 = ds.get_job(upstairs_connection_1, JobId(1000)).await?; - let job_2 = ds.get_job(upstairs_connection_2, JobId(1000)).await?; + let job_1 = ds.get_job(upstairs_connection_1, JobId(1000))?; + let job_2 = ds.get_job(upstairs_connection_2, JobId(1000))?; assert_eq!(job_1.upstairs_connection, upstairs_connection_1); assert_eq!(job_1.ds_id, JobId(1000)); @@ -6031,16 +5974,14 @@ mod test { offset: Block::new_512(1), }], }; - ds.add_work(upstairs_connection_1, JobId(1000), rio).await?; + ds.add_work(upstairs_connection_1, JobId(1000), rio)?; // Now we mimic what happens in the do_work_task() - let new_work = ds.new_work(upstairs_connection_1).await.unwrap(); + let new_work = ds.new_work(upstairs_connection_1).unwrap(); assert_eq!(new_work.len(), 1); - let ip_id = ds - .in_progress(upstairs_connection_1, JobId(1000)) - .await? - .unwrap(); + let ip_id = + ds.in_progress(upstairs_connection_1, JobId(1000))?.unwrap(); assert_eq!(ip_id, JobId(1000)); let m = ds .do_work(upstairs_connection_1, JobId(1000)) @@ -6061,10 +6002,8 @@ mod test { assert_eq!(rx1.try_recv().unwrap(), upstairs_connection_2); // This should error with UpstairsInactive - upstairs_connection_1 isn't - // active anymore and can't grab the work lock. - let result = ds - .complete_work(upstairs_connection_1, JobId(1000), m) - .await; + // active anymore and can't borrow the work map. + let result = ds.complete_work(upstairs_connection_1, JobId(1000), m); assert!(matches!( result.unwrap_err().downcast::().unwrap(), CrucibleError::UpstairsInactive, @@ -6131,16 +6070,14 @@ mod test { offset: Block::new_512(1), }], }; - ds.add_work(upstairs_connection_1, JobId(1000), rio).await?; + ds.add_work(upstairs_connection_1, JobId(1000), rio)?; // Now we mimic what happens in the do_work_task() - let new_work = ds.new_work(upstairs_connection_1).await.unwrap(); + let new_work = ds.new_work(upstairs_connection_1).unwrap(); assert_eq!(new_work.len(), 1); - let ip_id = ds - .in_progress(upstairs_connection_1, JobId(1000)) - .await? - .unwrap(); + let ip_id = + ds.in_progress(upstairs_connection_1, JobId(1000))?.unwrap(); assert_eq!(ip_id, JobId(1000)); let m = ds .do_work(upstairs_connection_1, JobId(1000)) @@ -6161,10 +6098,8 @@ mod test { assert_eq!(rx1.try_recv().unwrap(), upstairs_connection_2); // This should error with UpstairsInactive - upstairs_connection_1 isn't - // active anymore and can't grab the work lock. - let result = ds - .complete_work(upstairs_connection_1, JobId(1000), m) - .await; + // active anymore and can't borrow the work map. + let result = ds.complete_work(upstairs_connection_1, JobId(1000), m); assert!(matches!( result.unwrap_err().downcast::().unwrap(), CrucibleError::UpstairsInactive, @@ -6231,16 +6166,14 @@ mod test { offset: Block::new_512(1), }], }; - ds.add_work(upstairs_connection_1, JobId(1000), rio).await?; + ds.add_work(upstairs_connection_1, JobId(1000), rio)?; // Now we mimic what happens in the do_work_task() - let new_work = ds.new_work(upstairs_connection_1).await.unwrap(); + let new_work = ds.new_work(upstairs_connection_1).unwrap(); assert_eq!(new_work.len(), 1); - let ip_id = ds - .in_progress(upstairs_connection_1, JobId(1000)) - .await? - .unwrap(); + let ip_id = + ds.in_progress(upstairs_connection_1, JobId(1000))?.unwrap(); assert_eq!(ip_id, JobId(1000)); let m = ds .do_work(upstairs_connection_1, JobId(1000)) @@ -6262,9 +6195,7 @@ mod test { // If the original set of tasks don't end right away, they'll try to run // complete_work: - let result = ds - .complete_work(upstairs_connection_1, JobId(1000), m) - .await; + let result = ds.complete_work(upstairs_connection_1, JobId(1000), m); // `complete_work` will return Ok(()) despite not doing anything to the // Work struct.