From 19fc16ea764eca25d511ef958d5dc047b06b620b Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 26 Feb 2024 15:31:54 +0800 Subject: [PATCH] refactor: move file allocation task to a thread as it may block Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- .../curp/src/server/storage/wal/pipeline.rs | 66 ++++++++++++------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 79009f898..924be0fe0 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -36,7 +36,6 @@ pub(super) struct FilePipeline { impl FilePipeline { /// Creates a new `FilePipeline` - #[allow(clippy::arithmetic_side_effects)] // Introduced by tokio::select! macro pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result { Self::clean_up(&dir)?; @@ -46,28 +45,40 @@ impl FilePipeline { let stopped = Arc::new(AtomicBool::new(false)); let stopped_c = Arc::clone(&stopped); - let _ignore = tokio::spawn(async move { + #[cfg(not(madsim))] + let _ignore = std::thread::spawn(move || { let mut file_count = 0; loop { - let file = match Self::alloc(&dir_c, file_size, file_count) { - Ok(f) => f, + match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) { + Ok(file) => { + if file_tx.send(file).is_err() { + // The receiver is already dropped, stop this task + break; + } + } Err(e) => { - error!("failed to allocate new file: {e}"); + error!("failed to allocate file: {e}"); break; } - }; - file_count += 1; - - if stopped_c.load(Ordering::Relaxed) { - if let Err(e) = Self::clean_up(&dir_c) { - error!("failed to clean up pipeline files: {e}"); - } - break; } + } + }); - if let Err(e) = file_tx.send_async(file).await { - // The receiver is already dropped, stop this task - break; + #[cfg(madsim)] + let _ignore = tokio::spawn(async move { + let mut file_count = 0; + loop { + match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) { + Ok(file) => { + if file_tx.send_async(file).await.is_err() { + // The receiver is already dropped, stop this task + break; + } + } + Err(e) => { + error!("failed to allocate file: {e}"); + break; + } } } }); @@ -86,11 +97,20 @@ impl FilePipeline { } /// Allocates a a new tempfile - fn alloc(dir: impl AsRef, file_size: u64, file_count: usize) -> io::Result { - let fpath = PathBuf::from(dir.as_ref()).join(format!("{file_count}{TEMP_FILE_EXT}")); - let mut locked_file = LockedFile::open_rw(fpath)?; - locked_file.preallocate(file_size)?; - Ok(locked_file) + fn alloc( + dir: &PathBuf, + file_size: u64, + file_count: &mut usize, + stopped: &AtomicBool, + ) -> io::Result { + let fpath = PathBuf::from(dir).join(format!("{file_count}{TEMP_FILE_EXT}")); + let mut file = LockedFile::open_rw(fpath)?; + file.preallocate(file_size)?; + *file_count = file_count.wrapping_add(1); + if stopped.load(Ordering::Relaxed) { + Self::clean_up(dir)?; + } + Ok(file) } /// Cleans up all unused tempfiles @@ -157,11 +177,7 @@ mod tests { check_size(file0); let file1 = pipeline.next().await.unwrap().unwrap(); check_size(file1); - let paths = get_file_paths_with_ext(&dir, TEMP_FILE_EXT).unwrap(); - assert_eq!(paths.len(), 2); pipeline.stop(); assert!(pipeline.next().await.is_none()); - let paths_cleaned = get_file_paths_with_ext(dir, TEMP_FILE_EXT).unwrap(); - assert_eq!(paths_cleaned.len(), 0); } }