diff --git a/src/check.rs b/src/check.rs index 5ffc0ff5..4c890fe2 100644 --- a/src/check.rs +++ b/src/check.rs @@ -1,8 +1,9 @@ use crate::utils::is_revindex_database; +use anyhow::Result; use sourmash::index::revindex::{RevIndex, RevIndexOps}; -pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<(), Box> { +pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<()> { if !is_revindex_database(&index) { bail!("'{}' is not a valid RevIndex database", index); } diff --git a/src/fastgather.rs b/src/fastgather.rs index 31ab7217..789e9c63 100644 --- a/src/fastgather.rs +++ b/src/fastgather.rs @@ -126,9 +126,9 @@ pub fn fastgather( ) .ok(); - if let Err(e) = gather_out_thrd.join() { - eprintln!("Unable to join internal thread: {:?}", e); - } + gather_out_thrd + .join() + .expect("Unable to join internal thread"); Ok(()) } diff --git a/src/fastmultigather.rs b/src/fastmultigather.rs index cbfdbc7b..45f0d681 100644 --- a/src/fastmultigather.rs +++ b/src/fastmultigather.rs @@ -23,7 +23,7 @@ use sourmash::sketch::Sketch; use crate::utils::{ consume_query_by_gather, csvwriter_thread, load_collection, write_prefetch, - BranchwaterGatherResult, PrefetchResult, ReportType, + BranchwaterGatherResult, MultiCollection, PrefetchResult, ReportType, SmallSignature, }; #[allow(clippy::too_many_arguments)] @@ -81,9 +81,46 @@ pub fn fastmultigather( ReportType::Against, allow_failed_sigpaths, )?; - // load against sketches into memory - let against = against_collection.load_sketches()?; + let against_sketches = against_collection.load_sketches()?; + + let (n_processed, skipped_paths, failed_paths) = fastmultigather_obj( + &query_collection, + &against_sketches, + save_matches, + output_path, + threshold_hashes, + common_scaled, + create_empty_results, + )?; + + println!("DONE. Processed {} queries total.", n_processed); + + if skipped_paths > 0 { + eprintln!( + "WARNING: skipped {} query paths - no compatible signatures.", + skipped_paths + ); + } + if failed_paths > 0 { + eprintln!( + "WARNING: {} query paths failed to load. See error messages above.", + failed_paths + ); + } + + Ok(()) +} + +pub(crate) fn fastmultigather_obj( + query_collection: &MultiCollection, + against: &Vec, + save_matches: bool, + output_path: Option, + threshold_hashes: u64, + common_scaled: u32, + create_empty_results: bool, +) -> Result<(usize, usize, usize)> { // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); @@ -222,30 +259,13 @@ pub fn fastmultigather( }); drop(send); - if let Err(e) = gather_out_thrd.join() { - eprintln!("Unable to join internal thread: {:?}", e); - } - - println!( - "DONE. Processed {} queries total.", - processed_queries.into_inner() - ); - - let skipped_paths = skipped_paths.into_inner(); - let failed_paths = failed_paths.into_inner(); - - if skipped_paths > 0 { - eprintln!( - "WARNING: skipped {} query paths - no compatible signatures.", - skipped_paths - ); - } - if failed_paths > 0 { - eprintln!( - "WARNING: {} query paths failed to load. See error messages above.", - failed_paths - ); - } - - Ok(()) + gather_out_thrd + .join() + .expect("unable to join CSV writing thread!?"); + + Ok(( + processed_queries.into_inner(), + skipped_paths.into_inner(), + failed_paths.into_inner(), + )) } diff --git a/src/fastmultigather_rocksdb.rs b/src/fastmultigather_rocksdb.rs index 17c0b20f..cd88ebf8 100644 --- a/src/fastmultigather_rocksdb.rs +++ b/src/fastmultigather_rocksdb.rs @@ -12,7 +12,8 @@ use sourmash::sketch::minhash::KmerMinHash; use sourmash::storage::SigStore; use crate::utils::{ - csvwriter_thread, is_revindex_database, load_collection, BranchwaterGatherResult, ReportType, + csvwriter_thread, is_revindex_database, load_collection, BranchwaterGatherResult, + MultiCollection, ReportType, }; pub fn fastmultigather_rocksdb( @@ -22,7 +23,7 @@ pub fn fastmultigather_rocksdb( threshold_bp: u32, output: Option, allow_failed_sigpaths: bool, -) -> Result<(), Box> { +) -> Result<()> { if !is_revindex_database(&index) { bail!("'{}' is not a valid RevIndex database", index); } @@ -39,7 +40,9 @@ pub fn fastmultigather_rocksdb( let selection_scaled: u32 = match selection.scaled() { Some(scaled) => { if *max_db_scaled > scaled { - return Err("Error: database scaled is higher than requested scaled".into()); + return Err(anyhow::anyhow!( + "Error: database scaled is higher than requested scaled" + )); } scaled } @@ -59,6 +62,34 @@ pub fn fastmultigather_rocksdb( allow_failed_sigpaths, )?; + let (n_processed, skipped_paths, failed_paths) = + fastmultigather_rocksdb_obj(&query_collection, &db, &set_selection, threshold_bp, output)?; + + println!("DONE. Processed {} queries total.", n_processed); + + if skipped_paths > 0 { + eprintln!( + "WARNING: skipped {} query paths - no compatible signatures.", + skipped_paths + ); + } + if failed_paths > 0 { + eprintln!( + "WARNING: {} query paths failed to load. See error messages above.", + failed_paths + ); + } + + Ok(()) +} + +pub(crate) fn fastmultigather_rocksdb_obj( + query_collection: &MultiCollection, + db: &RevIndex, + selection: &Selection, + threshold_bp: u32, + output: Option, +) -> Result<(usize, usize, usize)> { // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); @@ -80,8 +111,8 @@ pub fn fastmultigather_rocksdb( let send = query_collection .par_iter() .filter_map(|(coll, _idx, record)| { - let threshold = threshold_bp / set_selection.scaled().expect("scaled is not set!?"); - let ksize = set_selection.ksize().expect("ksize not set!?"); + let threshold = threshold_bp / selection.scaled().expect("scaled is not set!?"); + let ksize = selection.ksize().expect("ksize not set!?"); // query downsampling happens here match coll.sig_from_record(record) { @@ -103,7 +134,7 @@ pub fn fastmultigather_rocksdb( hash_to_color, threshold as usize, &query_mh, - Some(set_selection.clone()), + Some(selection.clone()), ); if let Ok(matches) = matches { for match_ in &matches { @@ -178,55 +209,26 @@ pub fn fastmultigather_rocksdb( .flatten() .try_for_each_with(send, |s, m| s.send(m)); - let mut do_fail = false; - // do some cleanup and error handling - - if let Err(e) = send { - eprintln!("Unable to send internal data: {:?}", e); - do_fail = true; - } - - if let Err(e) = thrd.join() { - eprintln!("Unable to join internal thread: {:?}", e); - do_fail = true; - } + send.expect("Unable to send internal data"); + thrd.join().expect("Unable to join CSV writing thread."); // done! - let i: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst); - eprintln!("DONE. Processed {} search sigs", i); - - if i == 0 { - eprintln!("ERROR: no search sigs found!?"); - do_fail = true; - } - + let n_processed: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst); let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst); let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); let failed_gathers = failed_gathers.load(atomic::Ordering::SeqCst); - if skipped_paths > 0 { - eprintln!( - "WARNING: skipped {} query paths - no compatible signatures.", - skipped_paths - ); - } - if failed_paths > 0 { - eprintln!( - "WARNING: {} query paths failed to load. See error messages above.", - failed_paths - ); + if n_processed == 0 { + return Err(anyhow::anyhow!("no search sigs found!?")); } + if failed_gathers > 0 { - eprintln!( - "ERROR: {} failed gathers. See error messages above.", + return Err(anyhow::anyhow!( + "{} failed gathers. See error messages above.", failed_gathers - ); - do_fail = true; + )); } - if do_fail { - bail!("Unresolvable errors found; results cannot be trusted. Quitting."); - } - - Ok(()) + Ok((n_processed, skipped_paths, failed_paths)) } diff --git a/src/index.rs b/src/index.rs index 31c54a3f..59e40cce 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,8 +1,11 @@ +use anyhow::Result; + use sourmash::index::revindex::RevIndex; use sourmash::index::revindex::RevIndexOps; use sourmash::prelude::*; use std::path::Path; +use crate::utils::MultiCollection; use crate::utils::{load_collection, ReportType}; use sourmash::collection::{Collection, CollectionSet}; @@ -10,10 +13,10 @@ pub fn index>( siglist: String, selection: Selection, output: P, - colors: bool, + use_colors: bool, allow_failed_sigpaths: bool, use_internal_storage: bool, -) -> Result<(), Box> { +) -> Result<()> { eprintln!("Loading sketches from {}", siglist); let multi = match load_collection( @@ -27,6 +30,15 @@ pub fn index>( }; eprintln!("Found {} sketches total.", multi.len()); + index_obj(multi, output, use_colors, use_internal_storage) +} + +pub(crate) fn index_obj>( + multi: MultiCollection, + output: P, + use_colors: bool, + use_internal_storage: bool, +) -> Result<()> { // Try to convert it into a Collection and then CollectionSet. let collection = match Collection::try_from(multi.clone()) { // conversion worked! @@ -54,7 +66,7 @@ pub fn index>( match collection { Ok(collection) => { eprintln!("Indexing {} sketches.", collection.len()); - let mut index = RevIndex::create(output.as_ref(), collection, colors)?; + let mut index = RevIndex::create(output.as_ref(), collection, use_colors)?; if use_internal_storage { index.internalize_storage()?; diff --git a/src/lib.rs b/src/lib.rs index 9df4958f..30921215 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -380,5 +380,6 @@ fn sourmash_plugin_branchwater(_py: Python, m: &Bound<'_, PyModule>) -> PyResult m.add_function(wrap_pyfunction!(do_pairwise, m)?)?; m.add_function(wrap_pyfunction!(do_cluster, m)?)?; m.add_function(wrap_pyfunction!(do_singlesketch, m)?)?; + Ok(()) } diff --git a/src/manysearch.rs b/src/manysearch.rs index 2ce8bcab..0c81e1a3 100644 --- a/src/manysearch.rs +++ b/src/manysearch.rs @@ -10,7 +10,8 @@ use std::sync::atomic; use std::sync::atomic::AtomicUsize; use crate::utils::{ - csvwriter_thread, load_collection, ManySearchResult, ReportType, SmallSignature, + csvwriter_thread, load_collection, ManySearchResult, MultiCollection, ReportType, + SmallSignature, }; use sourmash::ani_utils::ani_from_containment; use sourmash::errors::SourmashError; @@ -71,6 +72,43 @@ pub fn manysearch( allow_failed_sigpaths, )?; + let (n_processed, skipped_paths, failed_paths) = manysearch_obj( + &query_sketchlist, + &against_collection, + threshold, + common_scaled, + output, + ignore_abundance, + output_all_comparisons, + )?; + + eprintln!("DONE. Processed {} search sigs", n_processed); + + if skipped_paths > 0 { + eprintln!( + "WARNING: skipped {} search paths - no compatible signatures.", + skipped_paths + ); + } + if failed_paths > 0 { + eprintln!( + "WARNING: {} search paths failed to load. See error messages above.", + failed_paths + ); + } + + Ok(()) +} + +pub(crate) fn manysearch_obj( + query_sketchlist: &Vec, + against_collection: &MultiCollection, + threshold: f64, + common_scaled: u32, + output: Option, + ignore_abundance: bool, + output_all_comparisons: bool, +) -> Result<(usize, usize, usize)> { // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); @@ -144,36 +182,16 @@ pub fn manysearch( .flatten() .try_for_each_with(send, |s, m| s.send(m)); - // do some cleanup and error handling - - if let Err(e) = send { - eprintln!("Unable to send internal data: {:?}", e); - } - - if let Err(e) = thrd.join() { - eprintln!("Unable to join internal thread: {:?}", e); - } + send.expect("Unable to send internal data"); + thrd.join().expect("Unable to join internal thread."); // done! let i: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst); - eprintln!("DONE. Processed {} search sigs", i); let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst); let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); - if skipped_paths > 0 { - eprintln!( - "WARNING: skipped {} search paths - no compatible signatures.", - skipped_paths - ); - } - if failed_paths > 0 { - eprintln!( - "WARNING: {} search paths failed to load. See error messages above.", - failed_paths - ); - } - - Ok(()) + Ok((i, skipped_paths, failed_paths)) } // inflate_abundances: "borrow" the abundances from 'against' onto the diff --git a/src/manysearch_rocksdb.rs b/src/manysearch_rocksdb.rs index 2ca36fe4..b6c5660a 100644 --- a/src/manysearch_rocksdb.rs +++ b/src/manysearch_rocksdb.rs @@ -14,7 +14,8 @@ use sourmash::sketch::minhash::KmerMinHash; use sourmash::storage::SigStore; use crate::utils::{ - csvwriter_thread, is_revindex_database, load_collection, ManySearchResult, ReportType, + csvwriter_thread, is_revindex_database, load_collection, ManySearchResult, MultiCollection, + ReportType, }; pub fn manysearch_rocksdb( @@ -25,7 +26,7 @@ pub fn manysearch_rocksdb( output: Option, allow_failed_sigpaths: bool, output_all_comparisons: bool, -) -> Result<(), Box> { +) -> Result<()> { if !is_revindex_database(&index) { bail!("'{}' is not a valid RevIndex database", index); } @@ -44,7 +45,9 @@ pub fn manysearch_rocksdb( let selection_scaled: u32 = match selection.scaled() { Some(scaled) => { if *max_db_scaled > scaled { - return Err("Error: database scaled is higher than requested scaled".into()); + return Err(anyhow::anyhow!( + "Error: database scaled is higher than requested scaled" + )); } scaled } @@ -65,6 +68,40 @@ pub fn manysearch_rocksdb( allow_failed_sigpaths, )?; + let (n_processed, skipped_paths, failed_paths) = manysearch_rocksdb_obj( + &query_collection, + &db, + minimum_containment, + output, + output_all_comparisons, + )?; + + // done! + eprintln!("DONE. Processed {} search sigs", n_processed); + + if skipped_paths > 0 { + eprintln!( + "WARNING: skipped {} query paths - no compatible signatures.", + skipped_paths + ); + } + if failed_paths > 0 { + eprintln!( + "WARNING: {} query paths failed to load. See error messages above.", + failed_paths + ); + } + + Ok(()) +} + +pub(crate) fn manysearch_rocksdb_obj( + query_collection: &MultiCollection, + db: &RevIndex, + minimum_containment: f64, + output: Option, + output_all_comparisons: bool, +) -> Result<(usize, usize, usize)> { // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); @@ -167,35 +204,13 @@ pub fn manysearch_rocksdb( } }); - // do some cleanup and error handling - - if let Err(e) = send_result { - eprintln!("Error during parallel processing: {}", e); - } - - // join the writer thread - if let Err(e) = thrd.join() { - eprintln!("Unable to join internal thread: {:?}", e); - } + send_result.expect("Error during parallel processing"); + thrd.join().expect("Unable to join internal thread."); - // done! - let i: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst); - eprintln!("DONE. Processed {} search sigs", i); + let i = processed_sigs.load(atomic::Ordering::SeqCst); let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst); let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); - if skipped_paths > 0 { - eprintln!( - "WARNING: skipped {} query paths - no compatible signatures.", - skipped_paths - ); - } - if failed_paths > 0 { - eprintln!( - "WARNING: {} query paths failed to load. See error messages above.", - failed_paths - ); - } - - Ok(()) + Ok((i, skipped_paths, failed_paths)) } diff --git a/src/manysketch.rs b/src/manysketch.rs index 6a08dea1..9026e538 100644 --- a/src/manysketch.rs +++ b/src/manysketch.rs @@ -16,7 +16,7 @@ pub fn manysketch( output: String, singleton: bool, force: bool, -) -> Result<(), Box> { +) -> Result<()> { let (fileinfo, n_fastas) = match load_fasta_fromfile(filelist, force) { Ok((file_info, n_fastas)) => (file_info, n_fastas), Err(e) => bail!("Could not load fromfile csv. Underlying error: {}", e), diff --git a/src/multisearch.rs b/src/multisearch.rs index 3ebd24f8..c4e053bb 100644 --- a/src/multisearch.rs +++ b/src/multisearch.rs @@ -147,7 +147,7 @@ pub fn multisearch( estimate_prob_overlap: bool, output_all_comparisons: bool, output: Option, -) -> Result<(), Box> { +) -> Result<()> { // Load all queries into memory at once. let query_collection = load_collection( &query_filepath, @@ -188,6 +188,34 @@ pub fn multisearch( let againsts: Vec = against_collection.load_sketches()?; + let n_processed = multisearch_obj( + &queries, + &againsts, + threshold, + estimate_ani, + estimate_prob_overlap, + output_all_comparisons, + output, + expected_scaled, + ksize, + )?; + + eprintln!("DONE. Processed {} comparisons", n_processed); + + Ok(()) +} + +pub(crate) fn multisearch_obj( + queries: &Vec, + againsts: &Vec, + threshold: f64, + estimate_ani: bool, + estimate_prob_overlap: bool, + output_all_comparisons: bool, + output: Option, + expected_scaled: u32, + ksize: f64, +) -> Result { let ( n_comparisons, query_merged_frequencies, @@ -330,17 +358,10 @@ pub fn multisearch( .try_for_each_with(send, |s, m| s.send(m)); // do some cleanup and error handling - - if let Err(e) = send { - eprintln!("Unable to send internal data: {:?}", e); - } - - if let Err(e) = thrd.join() { - eprintln!("Unable to join internal thread: {:?}", e); - } + send.expect("Unable to send internal data"); + thrd.join().expect("Unable to join internal thread"); // done! let i: usize = processed_cmp.fetch_max(0, atomic::Ordering::SeqCst); - eprintln!("DONE. Processed {} comparisons", i); - - Ok(()) + Ok(i) } diff --git a/src/pairwise.rs b/src/pairwise.rs index f5d2d362..f5cb53d3 100644 --- a/src/pairwise.rs +++ b/src/pairwise.rs @@ -4,7 +4,9 @@ use rayon::prelude::*; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use crate::utils::{csvwriter_thread, load_collection, MultiSearchResult, ReportType}; +use crate::utils::{ + csvwriter_thread, load_collection, MultiSearchResult, ReportType, SmallSignature, +}; use sourmash::ani_utils::ani_from_containment; use sourmash::selection::Selection; use sourmash::signature::SigsTrait; @@ -22,7 +24,7 @@ pub fn pairwise( write_all: bool, output_all_comparisons: bool, output: Option, -) -> Result<(), Box> { +) -> Result<()> { // Load all sigs into memory at once. let collection = load_collection( &siglist, @@ -53,7 +55,31 @@ pub fn pairwise( selection.set_scaled(common_scaled); let sketches = collection.load_sketches()?; + let ksize = selection.ksize().unwrap() as f64; + + let n_processed = pairwise_obj( + &sketches, + estimate_ani, + write_all, + output_all_comparisons, + output, + threshold, + ksize, + )?; + eprintln!("DONE. Processed {} comparisons", n_processed); + + Ok(()) +} +pub(crate) fn pairwise_obj( + sketches: &Vec, + estimate_ani: bool, + write_all: bool, + output_all_comparisons: bool, + output: Option, + threshold: f64, + ksize: f64, +) -> Result { // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); @@ -66,7 +92,6 @@ pub fn pairwise( // Results written to the writer thread above. let processed_cmp = AtomicUsize::new(0); - let ksize = selection.ksize().unwrap() as f64; sketches.par_iter().enumerate().for_each(|(idx, query)| { for against in sketches.iter().skip(idx + 1) { @@ -184,13 +209,9 @@ pub fn pairwise( // do some cleanup and error handling - drop(send); // close the channel - if let Err(e) = thrd.join() { - eprintln!("Unable to join internal thread: {:?}", e); - } + thrd.join().expect("Unable to join internal thread"); // done! let i: usize = processed_cmp.load(atomic::Ordering::SeqCst); - eprintln!("DONE. Processed {} comparisons", i); - - Ok(()) + Ok(i) } diff --git a/src/python/tests/test_fastmultigather.py b/src/python/tests/test_fastmultigather.py index 69eab9cf..257bb097 100644 --- a/src/python/tests/test_fastmultigather.py +++ b/src/python/tests/test_fastmultigather.py @@ -1836,11 +1836,7 @@ def test_rocksdb_no_internal_storage_gather_fails(runtmp, capfd): print(captured.err) assert "Error gathering matches:" in captured.err - assert "ERROR: 1 failed gathers. See error messages above." in captured.err - assert ( - "Unresolvable errors found; results cannot be trusted. Quitting." - in captured.err - ) + assert "1 failed gathers. See error messages above." in captured.err def test_save_matches(runtmp): diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 2ee44c84..89ad8b5a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -99,7 +99,7 @@ pub fn write_prefetch( query_md5: String, prefetch_output: Option, matchlist: &BinaryHeap, -) -> Result<(), Box> { +) -> Result<()> { // Define the writer to stdout by default let mut writer: Box = Box::new(std::io::stdout());