Skip to content

Commit

Permalink
MRG: refactor the internal Rust API to support container-level functi…
Browse files Browse the repository at this point in the history
…onality (#569)

* refactor CSV output for fastgather/fastmultigather to use mpsc

* cargo fmt

* tests mostly pass

* fix skipmer test

* upd comment

* black

* switch to min_max_scaled for rocksdb

* black

* ensure overlap is > 0

* rm print

* cleanup

* fix clippy messages about too-complex returns

* cargo fmt

* separate out Rust API changes from #552

* upd overlap

* fix

* fix

* remove remaining Box/error stuff

* remove remaining join => Err

* clean up error handling a bit

* refactor out loading of sketches

* fix tests

* update docs

* upd

* break test again

* do heinous dev stuff

* fix fix comment

* upd

* upd

* do not require -o after all

* fix for crate_empty_results

* fmt

* remove @ctb notes

* remove obligatory clone from Rust layer
  • Loading branch information
ctb authored Jan 7, 2025
1 parent ba51078 commit 99c0a87
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 161 deletions.
3 changes: 2 additions & 1 deletion src/check.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::utils::is_revindex_database;
use anyhow::Result;

use sourmash::index::revindex::{RevIndex, RevIndexOps};

pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<(), Box<dyn std::error::Error>> {
pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<()> {
if !is_revindex_database(&index) {
bail!("'{}' is not a valid RevIndex database", index);
}
Expand Down
6 changes: 3 additions & 3 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ pub fn fastgather(
)
.ok();

if let Err(e) = gather_out_thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}
gather_out_thrd
.join()
.expect("Unable to join internal thread");

Ok(())
}
78 changes: 49 additions & 29 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use sourmash::sketch::Sketch;

use crate::utils::{
consume_query_by_gather, csvwriter_thread, load_collection, write_prefetch,
BranchwaterGatherResult, PrefetchResult, ReportType,
BranchwaterGatherResult, MultiCollection, PrefetchResult, ReportType, SmallSignature,
};

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -81,9 +81,46 @@ pub fn fastmultigather(
ReportType::Against,
allow_failed_sigpaths,
)?;
// load against sketches into memory
let against = against_collection.load_sketches()?;

let against_sketches = against_collection.load_sketches()?;

let (n_processed, skipped_paths, failed_paths) = fastmultigather_obj(
&query_collection,
&against_sketches,
save_matches,
output_path,
threshold_hashes,
common_scaled,
create_empty_results,
)?;

println!("DONE. Processed {} queries total.", n_processed);

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
}

Ok(())
}

pub(crate) fn fastmultigather_obj(
query_collection: &MultiCollection,
against: &Vec<SmallSignature>,
save_matches: bool,
output_path: Option<String>,
threshold_hashes: u64,
common_scaled: u32,
create_empty_results: bool,
) -> Result<(usize, usize, usize)> {
// set up a multi-producer, single-consumer channel.
let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
Expand Down Expand Up @@ -222,30 +259,13 @@ pub fn fastmultigather(
});

drop(send);
if let Err(e) = gather_out_thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}

println!(
"DONE. Processed {} queries total.",
processed_queries.into_inner()
);

let skipped_paths = skipped_paths.into_inner();
let failed_paths = failed_paths.into_inner();

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
}

Ok(())
gather_out_thrd
.join()
.expect("unable to join CSV writing thread!?");

Ok((
processed_queries.into_inner(),
skipped_paths.into_inner(),
failed_paths.into_inner(),
))
}
92 changes: 47 additions & 45 deletions src/fastmultigather_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use sourmash::sketch::minhash::KmerMinHash;
use sourmash::storage::SigStore;

use crate::utils::{
csvwriter_thread, is_revindex_database, load_collection, BranchwaterGatherResult, ReportType,
csvwriter_thread, is_revindex_database, load_collection, BranchwaterGatherResult,
MultiCollection, ReportType,
};

pub fn fastmultigather_rocksdb(
Expand All @@ -22,7 +23,7 @@ pub fn fastmultigather_rocksdb(
threshold_bp: u32,
output: Option<String>,
allow_failed_sigpaths: bool,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<()> {
if !is_revindex_database(&index) {
bail!("'{}' is not a valid RevIndex database", index);
}
Expand All @@ -39,7 +40,9 @@ pub fn fastmultigather_rocksdb(
let selection_scaled: u32 = match selection.scaled() {
Some(scaled) => {
if *max_db_scaled > scaled {
return Err("Error: database scaled is higher than requested scaled".into());
return Err(anyhow::anyhow!(
"Error: database scaled is higher than requested scaled"
));
}
scaled
}
Expand All @@ -59,6 +62,34 @@ pub fn fastmultigather_rocksdb(
allow_failed_sigpaths,
)?;

let (n_processed, skipped_paths, failed_paths) =
fastmultigather_rocksdb_obj(&query_collection, &db, &set_selection, threshold_bp, output)?;

println!("DONE. Processed {} queries total.", n_processed);

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
}

Ok(())
}

pub(crate) fn fastmultigather_rocksdb_obj(
query_collection: &MultiCollection,
db: &RevIndex,
selection: &Selection,
threshold_bp: u32,
output: Option<String>,
) -> Result<(usize, usize, usize)> {
// set up a multi-producer, single-consumer channel.
let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
Expand All @@ -80,8 +111,8 @@ pub fn fastmultigather_rocksdb(
let send = query_collection
.par_iter()
.filter_map(|(coll, _idx, record)| {
let threshold = threshold_bp / set_selection.scaled().expect("scaled is not set!?");
let ksize = set_selection.ksize().expect("ksize not set!?");
let threshold = threshold_bp / selection.scaled().expect("scaled is not set!?");
let ksize = selection.ksize().expect("ksize not set!?");

// query downsampling happens here
match coll.sig_from_record(record) {
Expand All @@ -103,7 +134,7 @@ pub fn fastmultigather_rocksdb(
hash_to_color,
threshold as usize,
&query_mh,
Some(set_selection.clone()),
Some(selection.clone()),
);
if let Ok(matches) = matches {
for match_ in &matches {
Expand Down Expand Up @@ -178,55 +209,26 @@ pub fn fastmultigather_rocksdb(
.flatten()
.try_for_each_with(send, |s, m| s.send(m));

let mut do_fail = false;

// do some cleanup and error handling -
if let Err(e) = send {
eprintln!("Unable to send internal data: {:?}", e);
do_fail = true;
}

if let Err(e) = thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
do_fail = true;
}
send.expect("Unable to send internal data");
thrd.join().expect("Unable to join CSV writing thread.");

// done!
let i: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst);
eprintln!("DONE. Processed {} search sigs", i);

if i == 0 {
eprintln!("ERROR: no search sigs found!?");
do_fail = true;
}

let n_processed: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst);
let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst);
let failed_paths = failed_paths.load(atomic::Ordering::SeqCst);
let failed_gathers = failed_gathers.load(atomic::Ordering::SeqCst);

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
if n_processed == 0 {
return Err(anyhow::anyhow!("no search sigs found!?"));
}

if failed_gathers > 0 {
eprintln!(
"ERROR: {} failed gathers. See error messages above.",
return Err(anyhow::anyhow!(
"{} failed gathers. See error messages above.",
failed_gathers
);
do_fail = true;
));
}

if do_fail {
bail!("Unresolvable errors found; results cannot be trusted. Quitting.");
}

Ok(())
Ok((n_processed, skipped_paths, failed_paths))
}
18 changes: 15 additions & 3 deletions src/index.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use anyhow::Result;

use sourmash::index::revindex::RevIndex;
use sourmash::index::revindex::RevIndexOps;
use sourmash::prelude::*;
use std::path::Path;

use crate::utils::MultiCollection;
use crate::utils::{load_collection, ReportType};
use sourmash::collection::{Collection, CollectionSet};

pub fn index<P: AsRef<Path>>(
siglist: String,
selection: Selection,
output: P,
colors: bool,
use_colors: bool,
allow_failed_sigpaths: bool,
use_internal_storage: bool,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<()> {
eprintln!("Loading sketches from {}", siglist);

let multi = match load_collection(
Expand All @@ -27,6 +30,15 @@ pub fn index<P: AsRef<Path>>(
};
eprintln!("Found {} sketches total.", multi.len());

index_obj(multi, output, use_colors, use_internal_storage)
}

pub(crate) fn index_obj<P: AsRef<Path>>(
multi: MultiCollection,
output: P,
use_colors: bool,
use_internal_storage: bool,
) -> Result<()> {
// Try to convert it into a Collection and then CollectionSet.
let collection = match Collection::try_from(multi.clone()) {
// conversion worked!
Expand Down Expand Up @@ -54,7 +66,7 @@ pub fn index<P: AsRef<Path>>(
match collection {
Ok(collection) => {
eprintln!("Indexing {} sketches.", collection.len());
let mut index = RevIndex::create(output.as_ref(), collection, colors)?;
let mut index = RevIndex::create(output.as_ref(), collection, use_colors)?;

if use_internal_storage {
index.internalize_storage()?;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,5 +380,6 @@ fn sourmash_plugin_branchwater(_py: Python, m: &Bound<'_, PyModule>) -> PyResult
m.add_function(wrap_pyfunction!(do_pairwise, m)?)?;
m.add_function(wrap_pyfunction!(do_cluster, m)?)?;
m.add_function(wrap_pyfunction!(do_singlesketch, m)?)?;

Ok(())
}
Loading

0 comments on commit 99c0a87

Please sign in to comment.