From 2824ef87c95504aacbdcf33e3e0d5049dfa8e130 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Fri, 19 Apr 2024 15:02:29 +0800 Subject: [PATCH] refactor: wal pipeline will exit task on drop Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- .../curp/src/server/storage/wal/pipeline.rs | 118 +++++++++++------- 1 file changed, 71 insertions(+), 47 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index a36d761ce..de29d7949 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -6,6 +6,7 @@ use std::{ Arc, }, task::Poll, + thread::JoinHandle, }; use clippy_utilities::OverflowArithmetic; @@ -13,7 +14,6 @@ use event_listener::Event; use flume::r#async::RecvStream; use futures::{FutureExt, StreamExt}; use thiserror::Error; -use tokio::task::JoinHandle; use tokio_stream::Stream; use tracing::error; @@ -28,15 +28,17 @@ pub(super) struct FilePipeline { dir: PathBuf, /// The size of the temp file file_size: u64, - /// The file receive stream - file_stream: flume::IntoIter, + /// The file receive iterator + file_iter: Option>, /// Stopped flag stopped: Arc, + /// Join handle of the allocation task + file_alloc_task_handle: Option>, } impl FilePipeline { /// Creates a new `FilePipeline` - pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result { + pub(super) fn new(dir: PathBuf, file_size: u64) -> Self { if let Err(e) = Self::clean_up(&dir) { error!("Failed to clean up tmp files: {e}"); } @@ -47,61 +49,74 @@ impl FilePipeline { let stopped_c = Arc::clone(&stopped); #[cfg(not(madsim))] - let _ignore = std::thread::spawn(move || { - let mut file_count = 0; - loop { - match Self::alloc(&dir_c, file_size, &mut file_count) { - Ok(file) => { - if file_tx.send(file).is_err() { - // The receiver is already dropped, stop this task - break; - } - if stopped_c.load(Ordering::Relaxed) { - if let Err(e) = Self::clean_up(&dir_c) { - error!("failed to clean up pipeline temp files: {e}"); + { + let file_alloc_task_handle = std::thread::spawn(move || { + let mut file_count = 0; + loop { + match Self::alloc(&dir_c, file_size, &mut file_count) { + Ok(file) => { + if file_tx.send(file).is_err() { + // The receiver is already dropped, stop this task + break; } + if stopped_c.load(Ordering::Relaxed) { + if let Err(e) = Self::clean_up(&dir_c) { + error!("failed to clean up pipeline temp files: {e}"); + } + break; + } + } + Err(e) => { + error!("failed to allocate file: {e}"); break; } } - Err(e) => { - error!("failed to allocate file: {e}"); - break; - } } + }); + + Self { + dir, + file_size, + file_iter: Some(file_rx.into_iter()), + stopped, + file_alloc_task_handle: Some(file_alloc_task_handle), } - }); + } #[cfg(madsim)] - let _ignore = tokio::spawn(async move { - let mut file_count = 0; - loop { - match Self::alloc(&dir_c, file_size, &mut file_count) { - Ok(file) => { - if file_tx.send_async(file).await.is_err() { - // The receiver is already dropped, stop this task - break; - } - if stopped_c.load(Ordering::Relaxed) { - if let Err(e) = Self::clean_up(&dir_c) { - error!("failed to clean up pipeline temp files: {e}"); + { + let _ignore = tokio::spawn(async move { + let mut file_count = 0; + loop { + match Self::alloc(&dir_c, file_size, &mut file_count) { + Ok(file) => { + if file_tx.send_async(file).await.is_err() { + // The receiver is already dropped, stop this task + break; + } + if stopped_c.load(Ordering::Relaxed) { + if let Err(e) = Self::clean_up(&dir_c) { + error!("failed to clean up pipeline temp files: {e}"); + } + break; } + } + Err(e) => { + error!("failed to allocate file: {e}"); break; } } - Err(e) => { - error!("failed to allocate file: {e}"); - break; - } } + }); + + Self { + dir, + file_size, + file_iter: Some(file_rx.into_iter()), + stopped, + file_alloc_task_handle: None, } - }); - - Ok(Self { - dir, - file_size, - file_stream: file_rx.into_iter(), - stopped, - }) + } } /// Stops the pipeline @@ -135,6 +150,11 @@ impl FilePipeline { impl Drop for FilePipeline { fn drop(&mut self) { self.stop(); + // Drops the file rx so that the allocation task could exit + drop(self.file_iter.take()); + if let Some(Err(e)) = self.file_alloc_task_handle.take().map(JoinHandle::join) { + error!("failed to join file allocation task: {e:?}"); + } } } @@ -145,7 +165,11 @@ impl Iterator for FilePipeline { if self.stopped.load(Ordering::Relaxed) { return None; } - self.file_stream.next().map(Ok) + self.file_iter + .as_mut() + .unwrap_or_else(|| unreachable!("Option is always `Some`")) + .next() + .map(Ok) } } @@ -167,7 +191,7 @@ mod tests { async fn file_pipeline_is_ok() { let file_size = 1024; let dir = tempfile::tempdir().unwrap(); - let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size).unwrap(); + let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size); let check_size = |mut file: LockedFile| { let file = file.into_std();