Skip to content

Commit

Permalink
Merge pull request #19 from bluegenes/err-handling
Browse files Browse the repository at this point in the history
fix error handling
  • Loading branch information
bluegenes authored May 9, 2024
2 parents a84b98a + 5b738fd commit 835b039
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[package]
name = "sourmash_plugin_directsketch"
version = "0.2.1"
version = "0.2.2"
edition = "2021"

[lib]
Expand Down
39 changes: 28 additions & 11 deletions src/directsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::collections::HashMap;
use std::fs::{self, create_dir_all};
use std::io::Cursor;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
Expand Down Expand Up @@ -581,6 +582,21 @@ pub fn failures_handle(
})
}

pub fn error_handler(
mut recv_errors: tokio::sync::mpsc::Receiver<anyhow::Error>,
error_flag: Arc<AtomicBool>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(error) = recv_errors.recv().await {
eprintln!("Error: {}", error);
if error.to_string().contains("No signatures written") {
error_flag.store(true, Ordering::SeqCst);
break;
}
}
})
}

#[tokio::main]
#[allow(clippy::too_many_arguments)]
pub async fn download_and_sketch(
Expand Down Expand Up @@ -611,17 +627,20 @@ pub async fn download_and_sketch(

// // create channels. buffer size can be changed - here it is 4 b/c we can do 3 downloads simultaneously
// // to do: see whether increasing buffer size speeds things up
let (send_sigs, recv_sigs) = tokio::sync::mpsc::channel::<Vec<Signature>>(4);
let (send_failed, recv_failed) = tokio::sync::mpsc::channel::<FailedDownload>(4);
let (send_sigs, recv_sigs) = tokio::sync::mpsc::channel::<Vec<Signature>>(1000);
let (send_failed, recv_failed) = tokio::sync::mpsc::channel::<FailedDownload>(100);
// // Error channel for handling task errors
let (error_sender, mut error_receiver) = tokio::sync::mpsc::channel::<anyhow::Error>(1);
let (error_sender, error_receiver) = tokio::sync::mpsc::channel::<anyhow::Error>(1);

// // // Set up collector/writing tasks
let mut handles = Vec::new();
let sig_handle = sigwriter_handle(recv_sigs, output_sigs, error_sender.clone());
let failures_handle = failures_handle(failed_csv, recv_failed, error_sender.clone());
let critical_error_flag = Arc::new(AtomicBool::new(false));
let error_handle = error_handler(error_receiver, critical_error_flag.clone());
handles.push(sig_handle);
handles.push(failures_handle);
handles.push(error_handle);

// // Worker tasks
let semaphore = Arc::new(Semaphore::new(3)); // Limiting concurrent downloads
Expand Down Expand Up @@ -713,16 +732,14 @@ pub async fn download_and_sketch(
// Wait for all tasks to complete
for handle in handles {
if let Err(e) = handle.await {
eprintln!("A task encountered an error: {}", e);
eprintln!("Handle join error: {}.", e);
}
}
// // Handle errors received from the error channel
while let Some(error) = error_receiver.recv().await {
eprintln!("Error: {}", error);
// Check if the error message contains "No signatures written"
if error.to_string().contains("No signatures written") & !download_only {
bail!("{}.", error);
}
// since the only critical error is not having written any sigs
// check this here at end. Bail if we wrote expected sigs but wrote none.
if critical_error_flag.load(Ordering::SeqCst) & !download_only {
bail!("No signatures written, exiting.");
}

Ok(())
}
2 changes: 1 addition & 1 deletion tests/test_gbsketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,4 @@ def test_gbsketch_bad_acc_fail(runtmp, capfd):
captured = capfd.readouterr()
print(captured.out)
print(captured.err)
assert "Error: No signatures written." in captured.err
assert "Error: No signatures written, exiting." in captured.err

0 comments on commit 835b039

Please sign in to comment.