Skip to content

Commit

Permalink
Move blocking flush to a worker thread
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Feb 2, 2024
1 parent dc3610a commit 4d6ae20
Showing 1 changed file with 34 additions and 21 deletions.
55 changes: 34 additions & 21 deletions downstairs/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,30 +869,43 @@ impl Region {
// - Do the work in the rayon thread pool instead of using tokio tasks
// - Carefully walk self.extents.as_mut_slice() to mutably borrow
// multiple at the same time.

let mut slice_start = 0;
let mut slice = self.extents.as_mut_slice();
let mut results = vec![Ok(()); dirty_extents.len()];
rayon::scope(|s| {
if matches!(
tokio::runtime::Handle::current().runtime_flavor(),
tokio::runtime::RuntimeFlavor::MultiThread
) {
let mut slice_start = 0;
let mut slice = self.extents.as_mut_slice();
tokio::task::block_in_place(|| {
rayon::scope(|s| {
for (eid, r) in dirty_extents.iter().zip(results.iter_mut())
{
let next = eid - slice_start;
slice = &mut slice[next..];
let (extent, rest) = slice.split_first_mut().unwrap();
let ExtentState::Opened(extent) = extent else {
panic!("can't flush closed extent");
};
slice = rest;
slice_start += next + 1;
s.spawn(|_| {
*r = extent.flush(
flush_number,
gen_number,
job_id,
&self.log,
)
});
}
})
});
} else {
let log = self.log.clone();
for (eid, r) in dirty_extents.iter().zip(results.iter_mut()) {
let next = eid - slice_start;
slice = &mut slice[next..];
let (extent, rest) = slice.split_first_mut().unwrap();
let ExtentState::Opened(extent) = extent else {
panic!("can't flush closed extent");
};
slice = rest;
slice_start += next + 1;
s.spawn(|_| {
*r = extent.flush(
flush_number,
gen_number,
job_id,
&self.log,
)
});
let extent = self.get_opened_extent_mut(*eid);
*r = extent.flush(flush_number, gen_number, job_id, &log);
}
});
}

cdt::os__flush__done!(|| job_id.0);

Expand Down

0 comments on commit 4d6ae20

Please sign in to comment.