Skip to content

Commit

Permalink
refactor: wal pipeline will exit task on drop
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Apr 19, 2024
1 parent 45e4c88 commit 0395358
Showing 1 changed file with 71 additions and 47 deletions.
118 changes: 71 additions & 47 deletions crates/curp/src/server/storage/wal/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::{
Arc,
},
task::Poll,
thread::JoinHandle,
};

use clippy_utilities::OverflowArithmetic;
use event_listener::Event;
use flume::r#async::RecvStream;
use futures::{FutureExt, StreamExt};
use thiserror::Error;
use tokio::task::JoinHandle;
use tokio_stream::Stream;
use tracing::error;

Expand All @@ -28,15 +28,17 @@ pub(super) struct FilePipeline {
dir: PathBuf,
/// The size of the temp file
file_size: u64,
/// The file receive stream
file_stream: flume::IntoIter<LockedFile>,
/// The file receive iterator
file_iter: Option<flume::IntoIter<LockedFile>>,
/// Stopped flag
stopped: Arc<AtomicBool>,
/// Join handle of the allocation task
file_alloc_task_handle: Option<JoinHandle<()>>,
}

impl FilePipeline {
/// Creates a new `FilePipeline`
pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result<Self> {
pub(super) fn new(dir: PathBuf, file_size: u64) -> Self {
if let Err(e) = Self::clean_up(&dir) {
error!("Failed to clean up tmp files: {e}");
}
Expand All @@ -47,61 +49,74 @@ impl FilePipeline {
let stopped_c = Arc::clone(&stopped);

#[cfg(not(madsim))]
let _ignore = std::thread::spawn(move || {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send(file).is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
{
let file_alloc_task_handle = std::thread::spawn(move || {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send(file).is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
}
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
});

Self {
dir,
file_size,
file_iter: Some(file_rx.into_iter()),
stopped,
file_alloc_task_handle: Some(file_alloc_task_handle),
}
});
}

#[cfg(madsim)]
let _ignore = tokio::spawn(async move {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send_async(file).await.is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
{
let _ignore = tokio::spawn(async move {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send_async(file).await.is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
}
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
});

Self {
dir,
file_size,
file_iter: Some(file_rx.into_iter()),
stopped,
file_alloc_task_handle: None,
}
});

Ok(Self {
dir,
file_size,
file_stream: file_rx.into_iter(),
stopped,
})
}
}

/// Stops the pipeline
Expand Down Expand Up @@ -135,6 +150,11 @@ impl FilePipeline {
impl Drop for FilePipeline {
fn drop(&mut self) {
self.stop();
// Drops the file rx so that the allocation task could exit
drop(self.file_iter.take());
if let Some(Err(e)) = self.file_alloc_task_handle.take().map(JoinHandle::join) {
error!("failed to join file allocation task: {e:?}");
}
}
}

Expand All @@ -145,7 +165,11 @@ impl Iterator for FilePipeline {
if self.stopped.load(Ordering::Relaxed) {
return None;
}
self.file_stream.next().map(Ok)
self.file_iter
.as_mut()
.unwrap_or_else(|| unreachable!("Option is always `Some`"))
.next()
.map(Ok)
}
}

Expand All @@ -167,7 +191,7 @@ mod tests {
async fn file_pipeline_is_ok() {
let file_size = 1024;
let dir = tempfile::tempdir().unwrap();
let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size).unwrap();
let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size);

let check_size = |mut file: LockedFile| {
let file = file.into_std();
Expand Down

0 comments on commit 0395358

Please sign in to comment.