Skip to content

Commit

Permalink
refactor: move file allocation task to a thread as it may block
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds authored and mergify[bot] committed Mar 5, 2024
1 parent b7705cb commit 19fc16e
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions crates/curp/src/server/storage/wal/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub(super) struct FilePipeline {

impl FilePipeline {
/// Creates a new `FilePipeline`
#[allow(clippy::arithmetic_side_effects)] // Introduced by tokio::select! macro
pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result<Self> {
Self::clean_up(&dir)?;

Expand All @@ -46,28 +45,40 @@ impl FilePipeline {
let stopped = Arc::new(AtomicBool::new(false));
let stopped_c = Arc::clone(&stopped);

let _ignore = tokio::spawn(async move {
#[cfg(not(madsim))]
let _ignore = std::thread::spawn(move || {
let mut file_count = 0;
loop {
let file = match Self::alloc(&dir_c, file_size, file_count) {
Ok(f) => f,
match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) {
Ok(file) => {
if file_tx.send(file).is_err() {
// The receiver is already dropped, stop this task
break;
}
}
Err(e) => {
error!("failed to allocate new file: {e}");
error!("failed to allocate file: {e}");
break;
}
};
file_count += 1;

if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline files: {e}");
}
break;
}
}
});

if let Err(e) = file_tx.send_async(file).await {
// The receiver is already dropped, stop this task
break;
#[cfg(madsim)]
let _ignore = tokio::spawn(async move {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) {
Ok(file) => {
if file_tx.send_async(file).await.is_err() {
// The receiver is already dropped, stop this task
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
}
});
Expand All @@ -86,11 +97,20 @@ impl FilePipeline {
}

/// Allocates a a new tempfile
fn alloc(dir: impl AsRef<Path>, file_size: u64, file_count: usize) -> io::Result<LockedFile> {
let fpath = PathBuf::from(dir.as_ref()).join(format!("{file_count}{TEMP_FILE_EXT}"));
let mut locked_file = LockedFile::open_rw(fpath)?;
locked_file.preallocate(file_size)?;
Ok(locked_file)
fn alloc(
dir: &PathBuf,
file_size: u64,
file_count: &mut usize,
stopped: &AtomicBool,
) -> io::Result<LockedFile> {
let fpath = PathBuf::from(dir).join(format!("{file_count}{TEMP_FILE_EXT}"));
let mut file = LockedFile::open_rw(fpath)?;
file.preallocate(file_size)?;
*file_count = file_count.wrapping_add(1);
if stopped.load(Ordering::Relaxed) {
Self::clean_up(dir)?;
}
Ok(file)
}

/// Cleans up all unused tempfiles
Expand Down Expand Up @@ -157,11 +177,7 @@ mod tests {
check_size(file0);
let file1 = pipeline.next().await.unwrap().unwrap();
check_size(file1);
let paths = get_file_paths_with_ext(&dir, TEMP_FILE_EXT).unwrap();
assert_eq!(paths.len(), 2);
pipeline.stop();
assert!(pipeline.next().await.is_none());
let paths_cleaned = get_file_paths_with_ext(dir, TEMP_FILE_EXT).unwrap();
assert_eq!(paths_cleaned.len(), 0);
}
}

0 comments on commit 19fc16e

Please sign in to comment.