Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: refactor the internal Rust API to support container-level functionality #569

Merged
merged 43 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
04ea44b
refactor CSV output for fastgather/fastmultigather to use mpsc
ctb Jan 3, 2025
656f870
cargo fmt
ctb Jan 3, 2025
fd3ce53
tests mostly pass
ctb Jan 3, 2025
8bc9d33
fix skipmer test
ctb Jan 4, 2025
cc17722
upd comment
ctb Jan 4, 2025
c6a34f8
Merge branch 'fix_skip_test' into refactor_gather_csv
ctb Jan 4, 2025
88a6466
black
ctb Jan 4, 2025
e755b0b
Merge branch 'fix_skip_test' into refactor_gather_csv
ctb Jan 4, 2025
ec91bc1
switch to min_max_scaled for rocksdb
ctb Jan 4, 2025
42ecb2e
black
ctb Jan 4, 2025
3f40c6b
ensure overlap is > 0
ctb Jan 4, 2025
0e483ce
rm print
ctb Jan 4, 2025
ff40d6b
cleanup
ctb Jan 4, 2025
41f1b07
fix clippy messages about too-complex returns
ctb Jan 4, 2025
2f05442
cargo fmt
ctb Jan 4, 2025
1f44422
separate out Rust API changes from #552
ctb Jan 5, 2025
46da554
upd overlap
ctb Jan 5, 2025
052d551
Merge branch 'refactor_gather_csv' into factor_out_rust_api
ctb Jan 5, 2025
4a3ccaf
fix
ctb Jan 5, 2025
746ea88
fix
ctb Jan 5, 2025
8fded7a
remove remaining Box/error stuff
ctb Jan 5, 2025
f6019b1
remove remaining join => Err
ctb Jan 5, 2025
8c9b73f
clean up error handling a bit
ctb Jan 5, 2025
aadc9ac
refactor out loading of sketches
ctb Jan 6, 2025
ebbd67b
fix tests
ctb Jan 6, 2025
547484a
update docs
ctb Jan 6, 2025
bf9a5b7
upd
ctb Jan 6, 2025
089f6ab
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Jan 7, 2025
6091cf8
break test again
ctb Jan 7, 2025
92d634a
do heinous dev stuff
ctb Jan 7, 2025
8fea8a7
fix fix comment
ctb Jan 7, 2025
4734806
Merge branch 'fix_skip_test' into refactor_gather_csv
ctb Jan 7, 2025
ea52473
upd
ctb Jan 7, 2025
38326e3
upd
ctb Jan 7, 2025
a1a646d
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Jan 7, 2025
401562c
do not require -o after all
ctb Jan 7, 2025
6293ced
Merge branch 'refactor_gather_csv' into factor_out_rust_api
ctb Jan 7, 2025
55c810a
fix for crate_empty_results
ctb Jan 7, 2025
bb82cf0
fmt
ctb Jan 7, 2025
4168c75
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Jan 7, 2025
660b17f
remove @CTB notes
ctb Jan 7, 2025
6a41221
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Jan 7, 2025
442af9d
remove obligatory clone from Rust layer
ctb Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/check.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::utils::is_revindex_database;
use anyhow::Result;

use sourmash::index::revindex::{RevIndex, RevIndexOps};

pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<(), Box<dyn std::error::Error>> {
pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<()> {
if !is_revindex_database(&index) {
bail!("'{}' is not a valid RevIndex database", index);
}
Expand Down
6 changes: 3 additions & 3 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ pub fn fastgather(
)
.ok();

if let Err(e) = gather_out_thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}
gather_out_thrd
.join()
.expect("Unable to join internal thread");

Ok(())
}
78 changes: 49 additions & 29 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use sourmash::sketch::Sketch;

use crate::utils::{
consume_query_by_gather, csvwriter_thread, load_collection, write_prefetch,
BranchwaterGatherResult, PrefetchResult, ReportType,
BranchwaterGatherResult, MultiCollection, PrefetchResult, ReportType, SmallSignature,
};

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -81,9 +81,46 @@ pub fn fastmultigather(
ReportType::Against,
allow_failed_sigpaths,
)?;
// load against sketches into memory
let against = against_collection.load_sketches()?;

let against_sketches = against_collection.load_sketches()?;

let (n_processed, skipped_paths, failed_paths) = fastmultigather_obj(
&query_collection,
&against_sketches,
save_matches,
output_path,
threshold_hashes,
common_scaled,
create_empty_results,
)?;

println!("DONE. Processed {} queries total.", n_processed);

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
}

Ok(())
}

pub(crate) fn fastmultigather_obj(
query_collection: &MultiCollection,
against: &Vec<SmallSignature>,
save_matches: bool,
output_path: Option<String>,
threshold_hashes: u64,
common_scaled: u32,
create_empty_results: bool,
) -> Result<(usize, usize, usize)> {
// set up a multi-producer, single-consumer channel.
let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
Expand Down Expand Up @@ -222,30 +259,13 @@ pub fn fastmultigather(
});

drop(send);
if let Err(e) = gather_out_thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}

println!(
"DONE. Processed {} queries total.",
processed_queries.into_inner()
);

let skipped_paths = skipped_paths.into_inner();
let failed_paths = failed_paths.into_inner();

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
}

Ok(())
gather_out_thrd
.join()
.expect("unable to join CSV writing thread!?");

Ok((
processed_queries.into_inner(),
skipped_paths.into_inner(),
failed_paths.into_inner(),
))
}
92 changes: 47 additions & 45 deletions src/fastmultigather_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use sourmash::sketch::minhash::KmerMinHash;
use sourmash::storage::SigStore;

use crate::utils::{
csvwriter_thread, is_revindex_database, load_collection, BranchwaterGatherResult, ReportType,
csvwriter_thread, is_revindex_database, load_collection, BranchwaterGatherResult,
MultiCollection, ReportType,
};

pub fn fastmultigather_rocksdb(
Expand All @@ -22,7 +23,7 @@ pub fn fastmultigather_rocksdb(
threshold_bp: u32,
output: Option<String>,
allow_failed_sigpaths: bool,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<()> {
if !is_revindex_database(&index) {
bail!("'{}' is not a valid RevIndex database", index);
}
Expand All @@ -39,7 +40,9 @@ pub fn fastmultigather_rocksdb(
let selection_scaled: u32 = match selection.scaled() {
Some(scaled) => {
if *max_db_scaled > scaled {
return Err("Error: database scaled is higher than requested scaled".into());
return Err(anyhow::anyhow!(
"Error: database scaled is higher than requested scaled"
));
}
scaled
}
Expand All @@ -59,6 +62,34 @@ pub fn fastmultigather_rocksdb(
allow_failed_sigpaths,
)?;

let (n_processed, skipped_paths, failed_paths) =
fastmultigather_rocksdb_obj(&query_collection, &db, &set_selection, threshold_bp, output)?;

println!("DONE. Processed {} queries total.", n_processed);

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
}

Ok(())
}

pub(crate) fn fastmultigather_rocksdb_obj(
query_collection: &MultiCollection,
db: &RevIndex,
selection: &Selection,
threshold_bp: u32,
output: Option<String>,
) -> Result<(usize, usize, usize)> {
// set up a multi-producer, single-consumer channel.
let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
Expand All @@ -80,8 +111,8 @@ pub fn fastmultigather_rocksdb(
let send = query_collection
.par_iter()
.filter_map(|(coll, _idx, record)| {
let threshold = threshold_bp / set_selection.scaled().expect("scaled is not set!?");
let ksize = set_selection.ksize().expect("ksize not set!?");
let threshold = threshold_bp / selection.scaled().expect("scaled is not set!?");
let ksize = selection.ksize().expect("ksize not set!?");

// query downsampling happens here
match coll.sig_from_record(record) {
Expand All @@ -103,7 +134,7 @@ pub fn fastmultigather_rocksdb(
hash_to_color,
threshold as usize,
&query_mh,
Some(set_selection.clone()),
Some(selection.clone()),
);
if let Ok(matches) = matches {
for match_ in &matches {
Expand Down Expand Up @@ -178,55 +209,26 @@ pub fn fastmultigather_rocksdb(
.flatten()
.try_for_each_with(send, |s, m| s.send(m));

let mut do_fail = false;

// do some cleanup and error handling -
if let Err(e) = send {
eprintln!("Unable to send internal data: {:?}", e);
do_fail = true;
}

if let Err(e) = thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
do_fail = true;
}
send.expect("Unable to send internal data");
thrd.join().expect("Unable to join CSV writing thread.");

// done!
let i: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst);
eprintln!("DONE. Processed {} search sigs", i);

if i == 0 {
eprintln!("ERROR: no search sigs found!?");
do_fail = true;
}

let n_processed: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst);
let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst);
let failed_paths = failed_paths.load(atomic::Ordering::SeqCst);
let failed_gathers = failed_gathers.load(atomic::Ordering::SeqCst);

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
if n_processed == 0 {
return Err(anyhow::anyhow!("no search sigs found!?"));
}

if failed_gathers > 0 {
eprintln!(
"ERROR: {} failed gathers. See error messages above.",
return Err(anyhow::anyhow!(
"{} failed gathers. See error messages above.",
failed_gathers
);
do_fail = true;
));
}

if do_fail {
bail!("Unresolvable errors found; results cannot be trusted. Quitting.");
}

Ok(())
Ok((n_processed, skipped_paths, failed_paths))
}
18 changes: 15 additions & 3 deletions src/index.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use anyhow::Result;

use sourmash::index::revindex::RevIndex;
use sourmash::index::revindex::RevIndexOps;
use sourmash::prelude::*;
use std::path::Path;

use crate::utils::MultiCollection;
use crate::utils::{load_collection, ReportType};
use sourmash::collection::{Collection, CollectionSet};

pub fn index<P: AsRef<Path>>(
siglist: String,
selection: Selection,
output: P,
colors: bool,
use_colors: bool,
allow_failed_sigpaths: bool,
use_internal_storage: bool,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<()> {
eprintln!("Loading sketches from {}", siglist);

let multi = match load_collection(
Expand All @@ -27,6 +30,15 @@ pub fn index<P: AsRef<Path>>(
};
eprintln!("Found {} sketches total.", multi.len());

index_obj(multi, output, use_colors, use_internal_storage)
}

pub(crate) fn index_obj<P: AsRef<Path>>(
multi: MultiCollection,
output: P,
use_colors: bool,
use_internal_storage: bool,
) -> Result<()> {
// Try to convert it into a Collection and then CollectionSet.
let collection = match Collection::try_from(multi.clone()) {
// conversion worked!
Expand Down Expand Up @@ -54,7 +66,7 @@ pub fn index<P: AsRef<Path>>(
match collection {
Ok(collection) => {
eprintln!("Indexing {} sketches.", collection.len());
let mut index = RevIndex::create(output.as_ref(), collection, colors)?;
let mut index = RevIndex::create(output.as_ref(), collection, use_colors)?;

if use_internal_storage {
index.internalize_storage()?;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,5 +380,6 @@ fn sourmash_plugin_branchwater(_py: Python, m: &Bound<'_, PyModule>) -> PyResult
m.add_function(wrap_pyfunction!(do_pairwise, m)?)?;
m.add_function(wrap_pyfunction!(do_cluster, m)?)?;
m.add_function(wrap_pyfunction!(do_singlesketch, m)?)?;

Ok(())
}
Loading
Loading