diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 924be0fe0..534cf0daa 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -40,7 +40,6 @@ impl FilePipeline { Self::clean_up(&dir)?; let (file_tx, file_rx) = flume::bounded(1); - let stop_event = Event::new(); let dir_c = dir.clone(); let stopped = Arc::new(AtomicBool::new(false)); let stopped_c = Arc::clone(&stopped); @@ -49,12 +48,18 @@ impl FilePipeline { let _ignore = std::thread::spawn(move || { let mut file_count = 0; loop { - match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) { + match Self::alloc(&dir_c, file_size, &mut file_count) { Ok(file) => { if file_tx.send(file).is_err() { // The receiver is already dropped, stop this task break; } + if stopped_c.load(Ordering::Relaxed) { + if let Err(e) = Self::clean_up(&dir_c) { + error!("failed to clean up pipeline temp files: {e}"); + } + break; + } } Err(e) => { error!("failed to allocate file: {e}"); @@ -68,12 +73,18 @@ impl FilePipeline { let _ignore = tokio::spawn(async move { let mut file_count = 0; loop { - match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) { + match Self::alloc(&dir_c, file_size, &mut file_count) { Ok(file) => { if file_tx.send_async(file).await.is_err() { // The receiver is already dropped, stop this task break; } + if stopped_c.load(Ordering::Relaxed) { + if let Err(e) = Self::clean_up(&dir_c) { + error!("failed to clean up pipeline temp files: {e}"); + } + break; + } } Err(e) => { error!("failed to allocate file: {e}"); @@ -97,19 +108,11 @@ impl FilePipeline { } /// Allocates a a new tempfile - fn alloc( - dir: &PathBuf, - file_size: u64, - file_count: &mut usize, - stopped: &AtomicBool, - ) -> io::Result { + fn alloc(dir: &PathBuf, file_size: u64, file_count: &mut usize) -> io::Result { let fpath = PathBuf::from(dir).join(format!("{file_count}{TEMP_FILE_EXT}")); let mut file = LockedFile::open_rw(fpath)?; file.preallocate(file_size)?; *file_count = file_count.wrapping_add(1); - if stopped.load(Ordering::Relaxed) { - Self::clean_up(dir)?; - } Ok(file) }