Skip to content

Commit

Permalink
refactor: exit allocation task on stop
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds authored and mergify[bot] committed Mar 5, 2024
1 parent 19fc16e commit 941449e
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions crates/curp/src/server/storage/wal/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl FilePipeline {
Self::clean_up(&dir)?;

let (file_tx, file_rx) = flume::bounded(1);
let stop_event = Event::new();
let dir_c = dir.clone();
let stopped = Arc::new(AtomicBool::new(false));
let stopped_c = Arc::clone(&stopped);
Expand All @@ -49,12 +48,18 @@ impl FilePipeline {
let _ignore = std::thread::spawn(move || {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send(file).is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
}
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
Expand All @@ -68,12 +73,18 @@ impl FilePipeline {
let _ignore = tokio::spawn(async move {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send_async(file).await.is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
}
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
Expand All @@ -97,19 +108,11 @@ impl FilePipeline {
}

/// Allocates a a new tempfile
fn alloc(
dir: &PathBuf,
file_size: u64,
file_count: &mut usize,
stopped: &AtomicBool,
) -> io::Result<LockedFile> {
fn alloc(dir: &PathBuf, file_size: u64, file_count: &mut usize) -> io::Result<LockedFile> {
let fpath = PathBuf::from(dir).join(format!("{file_count}{TEMP_FILE_EXT}"));
let mut file = LockedFile::open_rw(fpath)?;
file.preallocate(file_size)?;
*file_count = file_count.wrapping_add(1);
if stopped.load(Ordering::Relaxed) {
Self::clean_up(dir)?;
}
Ok(file)
}

Expand Down

0 comments on commit 941449e

Please sign in to comment.