diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..a05a706a --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustdocflags = ["--document-private-items"] diff --git a/Cargo.toml b/Cargo.toml index 2c40a4de..80103eef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ sourmash = { version = "0.16.0", features = ["branchwater"] } serde_json = "1.0.128" niffler = "2.4.0" log = "0.4.22" -env_logger = { version = "0.11.5", optional = true } +env_logger = { version = "0.11.5" } simple-error = "0.3.1" anyhow = "1.0.89" zip = { version = "2.0", default-features = false } diff --git a/doc/README.md b/doc/README.md index b0c68edd..6da3b286 100644 --- a/doc/README.md +++ b/doc/README.md @@ -35,7 +35,7 @@ polish and user experience. sourmash supports a variety of different storage formats for sketches (see [sourmash docs](https://sourmash.readthedocs.io/en/latest/command-line.html#choosing-signature-output-formats)), and the branchwater plugin works with some (but not all) of them. Branchwater _also_ supports an additional database type, a RocksDB-based inverted index, that is not (yet) supported natively by sourmash (through v4.8.11). - +**As of v0.9.8, we recommend using zip files or standalone manifest CSVs pointing to zip files whenever you need to provide multiple sketches.** | command | command input | database format | | -------- | -------- | -------- | @@ -54,62 +54,105 @@ When working with large collections of small sketches such as genomes, we sugges * sketches are compressed in zip files; * zip files can contain many sketches, including incompatible types (e.g. multiple k-mer sizes); -* zip files contain "manifests" listing their contents; -* subsets of zip files can be efficiently selected and loaded depending on what is needed; +* subsets of zip files can be efficiently selected and loaded; * in particular, _single_ sketches can be loaded on demand, supporting lower memory requirements for certain kinds of searches. -For all these reasons, zip files are the most efficient and effective basic storage type for sketches in sourmash, and as of the branchwater plugin v0.9.0, they are fully supported! +For all these reasons, zip files are the most efficient and effective +basic storage type for sketches in sourmash, and as of the branchwater +plugin v0.9.0, they are fully supported! You can create zipfiles with sourmash like so: ``` sourmash sig cat -o sigs.zip ``` - +will create a manifest CSV containing a subset of sketches [using picklists](https://sourmash.readthedocs.io/en/latest/command-line.html#using-picklists-to-subset-large-collections-of-signatures), + +and + +``` +find -type f /path/to/sig/files/* > pathlist.txt +sourmash sig collect pathlist.txt -o summary-manifest.csv -F csv +``` +will collect a list of all of the sketches under `/path/to/sig/files` +and make the list available through a combined manifest. + +Note here that manifests are _much_ smaller than the files containing all +of the sketches! + +Note also that manifests have many advantages over pathlists: in +particular, they contain metadata that enables fast loading of +specific sketches, and they support subsetting from large databases; +pathlists support neither. ### Using RocksDB inverted indexes -The branchwater plugin also supports a database type that is not yet supported by sourmash: inverted indexes stored in a RocksDB database. These indexes provide fast and low-memory lookups when searching very large datasets, and are used for the branchwater petabase scale search hosted at [branchwater.sourmash.bio](https://branchwater.sourmash.bio). +The branchwater plugin also supports a database type that is not yet +supported by sourmash: inverted indexes stored in a RocksDB +database. These indexes provide fast and low-memory lookups when +searching very large datasets, and are used for the branchwater +petabase scale search hosted at +[branchwater.sourmash.bio](https://branchwater.sourmash.bio). -Some commands - `fastmultigather` and `manysearch` - support using these RocksDB-based inverted indexes. They can be created by running `sourmash scripts index`. See [the `index` documentation, below](#Running-index). +Some commands - `fastmultigather` and `manysearch` - support using +these RocksDB-based inverted indexes for efficient search, and they +can be created by running `sourmash scripts index`. See +[the `index` documentation, below](#Running-index). ### Using "pathlists" - +**Note: We no longer recommend using "pathlists". Use zip files or + standalone manifests instead.** You can make a pathlist by listing a collection of .sig.gz files like so: ``` find /path/to/directory/ -name "*.sig.gz" -type f > directory.txt ``` -When using a pathlist for search, we load all signatures into memory at the start in order to generate a manifest. To avoid memory issues, the signatures are not kept in memory, but instead re-loaded as described below for each command (see: Notes on concurrency and efficiency). This makes using pathlists less efficient than `zip` files (as of v0.9.0). - - - +When using a pathlist for search, we load all signatures into memory +at the start in order to generate a manifest. To avoid memory issues, +the signatures are not kept in memory, but instead re-loaded as +described below for each command (see: Notes on concurrency and +efficiency). This makes using pathlists less efficient than `zip` +files (as of v0.9.0) or manifests (as of v0.9.8). ## Running the commands @@ -231,34 +274,37 @@ cat input.fa | sourmash scripts singlesketch - -o - ### Running `multisearch` and `pairwise` -The `multisearch` command compares one or more query genomes, and one or more subject genomes. It differs from `manysearch` by loading all genomes into memory. +The `multisearch` command compares one or more query genomes, and one or more subject genomes. It differs from `manysearch` because it loads everything into memory. `multisearch` takes two input collections and outputs a CSV: ``` sourmash scripts multisearch query.sig.gz database.zip -o results.csv ``` +The results file `results.csv`, will have 8 columns: `query` and +`query_md5`, `match` and `match_md5`, and `containment`, `jaccard`, +`max_containment`, and `intersect_hashes`. -The results file `results.csv`, will have 8 columns: `query` and `query_md5`, `match` and `match_md5`, and `containment`, `jaccard`, `max_containment`, and `intersect_hashes`. - -The `pairwise` command does the same comparisons as `multisearch` but takes -only a single collection of sketches, for which it calculates all the pairwise comparisons. Since the comparisons are symmetric, it is approximately -twice as fast as `multisearch`. +The `pairwise` command does the same comparisons as `multisearch` but +takes only a single collection of sketches, for which it calculates +all the pairwise comparisons. Since the comparisons are symmetric, it +is approximately twice as fast as `multisearch`. The `-t/--threshold` for `multisearch` and `pairwise` applies to the -containment of query-in-target and defaults to 0.01. To report -_any_ overlap between two sketches, set the threshold to 0. +containment of query-in-target and defaults to 0.01. To report _any_ +overlap between two sketches, set the threshold to 0. ### Running `fastgather` -The `fastgather` command is a much faster version of `sourmash gather`. +The `fastgather` command is parallelized (and typically much faster) +version of `sourmash gather`. `fastgather` takes a single query metagenome and a database, and outputs a CSV: ``` sourmash scripts fastgather query.sig.gz database.zip -o results.csv --cores 4 ``` -As of v0.9.5, `fastgather` outputs the same columns as `sourmash gather`, with only a few exceptions: +As of v0.9.5, `fastgather` outputs the same columns as `sourmash gather`, with only a few exception * `match_name` is output instead of `name`; * `match_md5` is output instead of `md5`; * `match_filename` is output instead of `filename`, and the value is different; @@ -270,33 +316,56 @@ As of v0.9.5, `fastgather` outputs the same columns as `sourmash gather`, with o ``` sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches ``` - -The main advantage that `fastmultigather` has over running `fastgather` on multiple queries is that `fastmultigather` only needs to load the database once for all queries, unlike with `fastgather`; this can be a significant time savings for large databases! +We suggest using standalone manifest CSVs wherever possible, especially if +the queries are large. + +The main advantage that `fastmultigather` has over running +`fastgather` on multiple queries is that `fastmultigather` only needs +to load the database once for all queries, unlike with `fastgather`; +this can be a significant time savings for large databases. #### Output files for `fastmultigather` -On a database of sketches (but not on RocksDB indexes) `fastmultigather` will output two CSV files for each query, a `prefetch` file containing all overlapping matches between that query and the database, and a `gather` file containing the minimum metagenome cover for that query in the database. +On a database of sketches (but not on RocksDB indexes) +`fastmultigather` will output two CSV files for each query, a +`prefetch` file containing all overlapping matches between that query +and the database, and a `gather` file containing the minimum +metagenome cover for that query in the database. -The prefetch CSV will be named `{signame}.prefetch.csv`, and the gather CSV will be named `{signame}.gather.csv`. Here, `{signame}` is the name of your sourmash signature. +The prefetch CSV will be named `{signame}.prefetch.csv`, and the +gather CSV will be named `{signame}.gather.csv`. Here, `{signame}` is +the name of your sourmash signature. -`--save-matches` is an optional flag that will save the matched hashes for each query in a separate sourmash signature `{signame}.matches.sig`. This can be useful for debugging or for further analysis. +`--save-matches` is an optional flag that will save the matched hashes +for each query in a separate sourmash signature +`{signame}.matches.sig`. This can be useful for debugging or for +further analysis. -When searching against a RocksDB index, `fastmultigather` will output a single file containing all gather results, specified with `-o/--output`. No prefetch results will be output. +When searching against a RocksDB index, `fastmultigather` will output +a single file containing all gather results, specified with +`-o/--output`. No prefetch results will be output. `fastmultigather` gather CSVs provide the same columns as `fastgather`, above. -**Warning:** At the moment, if two different queries have the same `{signame}`, the CSVs for one of the queries will be overwritten by the other query. The behavior here is undefined in practice, because of multithreading: we don't know what queries will be executed when or files will be written first. +**Warning:** At the moment, if two different queries have the same + `{signame}`, the CSVs for one of the queries will be overwritten by + the other query. The behavior here is undefined in practice, because + of multithreading: we don't know what queries will be executed when + or files will be written first. ### Running `manysearch` -The `manysearch` command compares one or more collections of query sketches, and one or more collections of subject sketches. It is the core command we use for searching petabase-scale databases of metagenomes for contained genomes. +The `manysearch` command compares one or more collections of query +sketches, and one or more collections of subject sketches. It is the +core command we use for searching petabase-scale databases of +metagenomes for contained genomes. `manysearch` takes two collections as input, and outputs a CSV: ``` sourmash scripts manysearch queries.zip metagenomes.manifest.csv -o results.csv ``` - +We suggest using a manifest CSV for the metagenome collection. The results file here, `query.x.gtdb-reps.csv`, will have the following columns: `query`, `query_md5`, `match_name`, `match_md5`, @@ -310,7 +379,8 @@ following columns: , `match_containment_ani`, Finally, if using sketches that have abundance information, the results file will also contain the following columns: `average_abund`, -`median_abund`, `std_abund`, `n_weighted_found`, and `total_weighted_hashes`. +`median_abund`, `std_abund`, `n_weighted_found`, and +`total_weighted_hashes`. See [the prefetch CSV output column documentation](https://sourmash.readthedocs.io/en/latest/classifying-signatures.html#appendix-e-prefetch-csv-output-columns) @@ -328,11 +398,16 @@ when executing large searches. ### Running `cluster` -The `cluster` command conducts graph-based clustering via the sequence similarity measures in `pairwise` or `multisearch` outputs. It is a new command and we are exploring its utility. +The `cluster` command conducts graph-based clustering via the sequence +similarity measures in `pairwise` or `multisearch` outputs. It is a +new command and we are exploring its utility. -`cluster` takes the csv output of `pairwise` or `multisearch` input, and outputs two CSVs: +`cluster` takes the csv output of `pairwise` or `multisearch` input, +and outputs two CSVs: -1. `-o`, `--output` will contain the names of the clusters and the `ident` of each sequence included in the cluster (e.g. `Component_1, name1;name2`) +1. `-o`, `--output` will contain the names of the clusters and the + `ident` of each sequence included in the cluster + (e.g. `Component_1, name1;name2`) ``` cluster,nodes @@ -340,7 +415,9 @@ Component_1,name1;name2;name3 Component_2,name4 ``` -2. `--cluster-sizes` will contain information on cluster size, with a counts for the number of clusters of that size. For the two clusters above, the counts would look like this: +2. `--cluster-sizes` will contain information on cluster size, with a + counts for the number of clusters of that size. For the two + clusters above, the counts would look like this: ``` cluster_size,count @@ -348,13 +425,17 @@ cluster_size,count 1,1 ``` -`cluster` takes a `--similarity_column` argument to specify which of the similarity columns, with the following choices: `containment`, `max_containment`, `jaccard`, `average_containment_ani`, `maximum_containment_ani`. All values should be input as fractions (e.g. 0.9 for 90%) +`cluster` takes a `--similarity_column` argument to specify which of +the similarity columns, with the following choices: `containment`, +`max_containment`, `jaccard`, `average_containment_ani`, +`maximum_containment_ani`. All values should be input as fractions +(e.g. 0.9 for 90%) ### Running `index` The `index` subcommand creates a RocksDB inverted index that can be -used as a database for `manysearch` (containment queries into -mixtures) and `fastmultigather` (mixture decomposition against a +used as an efficient database for `manysearch` (containment queries +into mixtures) and `fastmultigather` (mixture decomposition against a database of genomes). RocksDB inverted indexes support fast, low-latency, and low-memory @@ -378,6 +459,15 @@ disk space required for large databases. You can provide an optional reduces the disk space needed for the index. Read below for technical details! +As of v0.9.8, `index` can take any of the supported input types, but +unless you are using a zip file or a pathlist of JSON files, it may +need to load all the sketches into memory before indexing +them. Moreover, you can only use external storage with a zip file. We +are working on improving this; see +[issue #415](https://github.com/sourmash-bio/sourmash_plugin_branchwater/issues/415) +for details. A warning will be printed to stderr in situations where +the sketches are being loaded into memory. + #### Internal vs external storage of sketches in a RocksDB index (The below applies to v0.9.7 and later of the plugin; for v0.9.6 and @@ -426,9 +516,10 @@ if better support for relative paths is of interest! #### Links and more materials Note that RocksDB indexes are implemented in the core -[sourmash Rust library](https://crates.io/crates/sourmash), and -used in downstream software packages (this plugin, and -[the branchwater application code](https://github.com/sourmash-bio/branchwater)). The above documentation applies to sourmash core v0.15.0. +[sourmash Rust library](https://crates.io/crates/sourmash), and used +in downstream software packages (this plugin, and +[the branchwater application code](https://github.com/sourmash-bio/branchwater)). +The above documentation applies to sourmash core v0.15.0. ## Notes on concurrency and efficiency diff --git a/src/fastgather.rs b/src/fastgather.rs index 46512025..686e702e 100644 --- a/src/fastgather.rs +++ b/src/fastgather.rs @@ -33,7 +33,9 @@ pub fn fastgather( ) } // get single query sig and minhash - let query_sig = query_collection.sig_for_dataset(0)?; // need this for original md5sum + let query_sig = query_collection.get_first_sig().expect("no queries!?"); + + // @CTB avoid clone? let query_sig_ds = query_sig.clone().select(selection)?; // downsample let query_mh = match query_sig_ds.minhash() { Some(query_mh) => query_mh, @@ -89,7 +91,17 @@ pub fn fastgather( } if prefetch_output.is_some() { - write_prefetch(&query_sig, prefetch_output, &matchlist).ok(); + let query_filename = query_sig.filename(); + let query_name = query_sig.name(); + let query_md5 = query_sig.md5sum(); + write_prefetch( + query_filename, + query_name, + query_md5, + prefetch_output, + &matchlist, + ) + .ok(); } // run the gather! diff --git a/src/fastmultigather.rs b/src/fastmultigather.rs index 1876d536..379b9cfe 100644 --- a/src/fastmultigather.rs +++ b/src/fastmultigather.rs @@ -2,7 +2,7 @@ use anyhow::Result; use rayon::prelude::*; -use sourmash::prelude::ToWriter; +use sourmash::prelude::{Storage, ToWriter}; use sourmash::{selection::Selection, signature::SigsTrait}; use std::sync::atomic; @@ -15,13 +15,14 @@ use camino::Utf8Path as PathBuf; use std::collections::HashSet; use std::fs::File; +use log::trace; + use sourmash::signature::Signature; use sourmash::sketch::minhash::KmerMinHash; use sourmash::sketch::Sketch; use crate::utils::{ - consume_query_by_gather, load_collection, load_sketches, write_prefetch, PrefetchResult, - ReportType, + consume_query_by_gather, load_collection, write_prefetch, PrefetchResult, ReportType, }; pub fn fastmultigather( @@ -34,6 +35,8 @@ pub fn fastmultigather( save_matches: bool, create_empty_results: bool, ) -> Result<()> { + let _ = env_logger::try_init(); + // load query collection let query_collection = load_collection( &query_filepath, @@ -50,8 +53,7 @@ pub fn fastmultigather( 1 } } - .try_into() - .unwrap(); + .try_into()?; println!("threshold overlap: {} {}", threshold_hashes, threshold_bp); @@ -63,123 +65,130 @@ pub fn fastmultigather( allow_failed_sigpaths, )?; // load against sketches into memory, downsampling on the way - let against = load_sketches(against_collection, selection, ReportType::Against).unwrap(); + let against = against_collection.load_sketches(selection)?; // Iterate over all queries => do prefetch and gather! let processed_queries = AtomicUsize::new(0); let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); - query_collection.par_iter().for_each(|(_idx, record)| { + query_collection.par_iter().for_each(|(c, _idx, record)| { // increment counter of # of queries. q: could we instead use the _idx from par_iter(), or will it vary based on thread? let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst); // Load query sig (downsampling happens here) - match query_collection.sig_from_record(record) { + trace!( + "fastmultigather query load: from:{} idx:{} loc:{}", + c.storage().spec(), + _idx, + record.internal_location() + ); + match c.sig_from_record(record) { Ok(query_sig) => { let name = query_sig.name(); let prefix = name.split(' ').next().unwrap_or_default().to_string(); let location = PathBuf::new(&prefix).file_name().unwrap(); - if let Some(query_mh) = query_sig.minhash() { - let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None }; - let matchlist: BinaryHeap = against - .iter() - .filter_map(|against| { - let mut mm: Option = None; - if let Ok(overlap) = against.minhash.count_common(query_mh, false) { - if overlap >= threshold_hashes { - if save_matches { - if let Ok(intersection) = - against.minhash.intersection(query_mh) - { - matching_hashes - .as_mut() - .unwrap() - .extend(intersection.0); - } - } - let result = PrefetchResult { - name: against.name.clone(), - md5sum: against.md5sum.clone(), - minhash: against.minhash.clone(), - location: against.location.clone(), - overlap, - }; - mm = Some(result); - } - } - mm - }) - .collect(); - if !matchlist.is_empty() { - let prefetch_output = format!("{}.prefetch.csv", location); - let gather_output = format!("{}.gather.csv", location); - // Save initial list of matches to prefetch output - write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok(); - - // Now, do the gather! - consume_query_by_gather( - query_sig.clone(), - scaled as u64, - matchlist, - threshold_hashes, - Some(gather_output), - ) - .ok(); - - // Save matching hashes to .sig file if save_matches is true - if save_matches { - if let Some(hashes) = matching_hashes { - let sig_filename = format!("{}.matches.sig", name); - if let Ok(mut file) = File::create(&sig_filename) { - let unique_hashes: HashSet = hashes.into_iter().collect(); - let mut new_mh = KmerMinHash::new( - query_mh.scaled().try_into().unwrap(), - query_mh.ksize().try_into().unwrap(), - query_mh.hash_function().clone(), - query_mh.seed(), - false, - query_mh.num(), - ); - new_mh - .add_many(&unique_hashes.into_iter().collect::>()) - .ok(); - let mut signature = Signature::default(); - signature.push(Sketch::MinHash(new_mh)); - signature.set_filename(&name); - if let Err(e) = signature.to_writer(&mut file) { - eprintln!("Error writing signature file: {}", e); + let query_filename = query_sig.filename(); + let query_name = query_sig.name(); + let query_md5 = query_sig.md5sum(); + + let query_mh = query_sig.minhash().expect("cannot get sketch"); + let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None }; + let matchlist: BinaryHeap = against + .iter() + .filter_map(|against| { + let mut mm: Option = None; + if let Ok(overlap) = against.minhash.count_common(query_mh, false) { + if overlap >= threshold_hashes { + if save_matches { + if let Ok(intersection) = against.minhash.intersection(query_mh) + { + matching_hashes.as_mut().unwrap().extend(intersection.0); } - } else { - eprintln!("Error creating signature file: {}", sig_filename); } + let result = PrefetchResult { + name: against.name.clone(), + md5sum: against.md5sum.clone(), + minhash: against.minhash.clone(), + location: against.location.clone(), + overlap, + }; + mm = Some(result); } } - } else { - println!("No matches to '{}'", location); - if create_empty_results { - let prefetch_output = format!("{}.prefetch.csv", location); - let gather_output = format!("{}.gather.csv", location); - // touch output files - match std::fs::File::create(&prefetch_output) { - Ok(_) => {} - Err(e) => { - eprintln!("Failed to create empty prefetch output: {}", e) + mm + }) + .collect(); + + if !matchlist.is_empty() { + let prefetch_output = format!("{}.prefetch.csv", location); + let gather_output = format!("{}.gather.csv", location); + + // Save initial list of matches to prefetch output + write_prefetch( + query_filename, + query_name, + query_md5, + Some(prefetch_output), + &matchlist, + ) + .ok(); + + // Now, do the gather! + consume_query_by_gather( + query_sig.clone(), + scaled as u64, + matchlist, + threshold_hashes, + Some(gather_output), + ) + .ok(); + + // Save matching hashes to .sig file if save_matches is true + if save_matches { + if let Some(hashes) = matching_hashes { + let sig_filename = format!("{}.matches.sig", name); + if let Ok(mut file) = File::create(&sig_filename) { + let unique_hashes: HashSet = hashes.into_iter().collect(); + let mut new_mh = KmerMinHash::new( + query_mh.scaled(), + query_mh.ksize().try_into().unwrap(), + query_mh.hash_function().clone(), + query_mh.seed(), + false, + query_mh.num(), + ); + new_mh + .add_many(&unique_hashes.into_iter().collect::>()) + .ok(); + let mut signature = Signature::default(); + signature.push(Sketch::MinHash(new_mh)); + signature.set_filename(&name); + if let Err(e) = signature.to_writer(&mut file) { + eprintln!("Error writing signature file: {}", e); } - } - match std::fs::File::create(&gather_output) { - Ok(_) => {} - Err(e) => eprintln!("Failed to create empty gather output: {}", e), + } else { + eprintln!("Error creating signature file: {}", sig_filename); } } } } else { - // different warning here? Could not load sig from record?? - eprintln!( - "WARNING: no compatible sketches in path '{}'", - record.internal_location() - ); - let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + println!("No matches to '{}'", location); + if create_empty_results { + let prefetch_output = format!("{}.prefetch.csv", location); + let gather_output = format!("{}.gather.csv", location); + // touch output files + match std::fs::File::create(&prefetch_output) { + Ok(_) => {} + Err(e) => { + eprintln!("Failed to create empty prefetch output: {}", e) + } + } + match std::fs::File::create(&gather_output) { + Ok(_) => {} + Err(e) => eprintln!("Failed to create empty gather output: {}", e), + } + } } } Err(_) => { diff --git a/src/index.rs b/src/index.rs index 0cb6a97d..c303b09b 100644 --- a/src/index.rs +++ b/src/index.rs @@ -4,6 +4,7 @@ use sourmash::prelude::*; use std::path::Path; use crate::utils::{load_collection, ReportType}; +use sourmash::collection::{Collection, CollectionSet}; pub fn index>( siglist: String, @@ -13,24 +14,53 @@ pub fn index>( allow_failed_sigpaths: bool, use_internal_storage: bool, ) -> Result<(), Box> { - println!("Loading siglist"); + eprintln!("Loading sketches from {}", siglist); - let collection = load_collection( + let multi = match load_collection( &siglist, selection, ReportType::General, allow_failed_sigpaths, - )?; + ) { + Ok(multi) => multi, + Err(err) => return Err(err.into()), + }; + eprintln!("Found {} sketches total.", multi.len()); - let mut index = RevIndex::create( - output.as_ref(), - collection.select(selection)?.try_into()?, - colors, - )?; + // Try to convert it into a Collection and then CollectionSet. + let collection = match Collection::try_from(multi.clone()) { + // conversion worked! + Ok(c) => { + let cs: CollectionSet = c.select(selection)?.try_into()?; + Ok(cs) + } + // conversion failed; can we/should we load it into memory? + Err(_) => { + if use_internal_storage { + eprintln!("WARNING: loading all sketches into memory in order to index."); + eprintln!("See 'index' documentation for details."); + let c: Collection = multi.load_all_sigs(selection)?; + let cs: CollectionSet = c.try_into()?; + Ok(cs) + } else { + Err( + anyhow::anyhow!("cannot index this type of collection with external storage") + .into(), + ) + } + } + }; - if use_internal_storage { - index.internalize_storage()?; - } + match collection { + Ok(collection) => { + eprintln!("Indexing {} sketches.", collection.len()); + let mut index = RevIndex::create(output.as_ref(), collection, colors)?; - Ok(()) + if use_internal_storage { + index.internalize_storage()?; + } + Ok(()) + } + Err(e) => Err(e), + } } diff --git a/src/lib.rs b/src/lib.rs index 0f653337..d4bf33f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,8 @@ -/// Python interface Rust code for sourmash_plugin_branchwater. +//! Rust-to-Python interface code for sourmash_plugin_branchwater, using pyo3. +//! +//! If you're using Rust, you're probably most interested in +//! [utils](utils/index.html) + use pyo3::prelude::*; #[macro_use] @@ -112,6 +116,7 @@ fn do_fastgather( } #[pyfunction] +#[allow(clippy::too_many_arguments)] #[pyo3(signature = (query_filenames, siglist_path, threshold_bp, ksize, scaled, moltype, output_path=None, save_matches=false, create_empty_results=false))] fn do_fastmultigather( query_filenames: String, @@ -237,6 +242,8 @@ fn do_multisearch( estimate_ani: bool, output_path: Option, ) -> anyhow::Result { + let _ = env_logger::try_init(); + let selection = build_selection(ksize, scaled, &moltype); let allow_failed_sigpaths = true; @@ -347,6 +354,8 @@ fn do_cluster( } } +/// Module interface for the `sourmash_plugin_branchwater` extension module. + #[pymodule] fn sourmash_plugin_branchwater(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(do_manysearch, m)?)?; diff --git a/src/manysearch.rs b/src/manysearch.rs index 199af8b5..a6c320a0 100644 --- a/src/manysearch.rs +++ b/src/manysearch.rs @@ -9,7 +9,7 @@ use stats::{median, stddev}; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use crate::utils::{csvwriter_thread, load_collection, load_sketches, ReportType, SearchResult}; +use crate::utils::{csvwriter_thread, load_collection, ReportType, SearchResult}; use sourmash::ani_utils::ani_from_containment; use sourmash::errors::SourmashError; use sourmash::selection::Selection; @@ -32,10 +32,11 @@ pub fn manysearch( ReportType::Query, allow_failed_sigpaths, )?; + // load all query sketches into memory, downsampling on the way - let query_sketchlist = load_sketches(query_collection, selection, ReportType::Query).unwrap(); + let query_sketchlist = query_collection.load_sketches(selection)?; - // Against: Load all _paths_, not signatures, into memory. + // Against: Load collection, potentially off disk & not into memory. let against_collection = load_collection( &against_filepath, selection, @@ -61,7 +62,7 @@ pub fn manysearch( let send = against_collection .par_iter() - .filter_map(|(_idx, record)| { + .filter_map(|(coll, _idx, record)| { let i = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst); if i % 1000 == 0 && i > 0 { eprintln!("Processed {} search sigs", i); @@ -70,7 +71,7 @@ pub fn manysearch( let mut results = vec![]; // against downsampling happens here - match against_collection.sig_from_record(record) { + match coll.sig_from_record(record) { Ok(against_sig) => { if let Some(against_mh) = against_sig.minhash() { for query in query_sketchlist.iter() { diff --git a/src/mastiff_manygather.rs b/src/mastiff_manygather.rs index ea99153c..eb665cb6 100644 --- a/src/mastiff_manygather.rs +++ b/src/mastiff_manygather.rs @@ -54,12 +54,12 @@ pub fn mastiff_manygather( let send = query_collection .par_iter() - .filter_map(|(_idx, record)| { + .filter_map(|(coll, _idx, record)| { let threshold = threshold_bp / selection.scaled()? as usize; let ksize = selection.ksize()?; // query downsampling happens here - match query_collection.sig_from_record(record) { + match coll.sig_from_record(record) { Ok(query_sig) => { let mut results = vec![]; if let Some(query_mh) = query_sig.minhash() { diff --git a/src/mastiff_manysearch.rs b/src/mastiff_manysearch.rs index fac364c6..158dded1 100644 --- a/src/mastiff_manysearch.rs +++ b/src/mastiff_manysearch.rs @@ -1,6 +1,7 @@ /// mastiff_manysearch: mastiff-indexed version of manysearch. use anyhow::Result; use camino::Utf8PathBuf as PathBuf; +use log::debug; use rayon::prelude::*; use std::sync::atomic; use std::sync::atomic::AtomicUsize; @@ -26,6 +27,7 @@ pub fn mastiff_manysearch( bail!("'{}' is not a valid RevIndex database", index); } // Open database once + debug!("Opened revindex: '{index}')"); let db = RevIndex::open(index, true, None)?; println!("Loaded DB"); @@ -56,7 +58,7 @@ pub fn mastiff_manysearch( let send_result = query_collection .par_iter() - .filter_map(|(_idx, record)| { + .filter_map(|(coll, _idx, record)| { let i = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst); if i % 1000 == 0 && i > 0 { eprintln!("Processed {} search sigs", i); @@ -64,7 +66,7 @@ pub fn mastiff_manysearch( let mut results = vec![]; // query downsample happens here - match query_collection.sig_from_record(record) { + match coll.sig_from_record(record) { Ok(query_sig) => { if let Some(query_mh) = query_sig.minhash() { let query_size = query_mh.size(); @@ -73,6 +75,7 @@ pub fn mastiff_manysearch( db.matches_from_counter(counter, minimum_containment as usize); // filter the matches for containment + debug!("FOUND: {} matches for {:?}", matches.len(), query_sig); for (path, overlap) in matches { let containment = overlap as f64 / query_size as f64; if containment >= minimum_containment { diff --git a/src/multisearch.rs b/src/multisearch.rs index 19d2264d..cf4bacb8 100644 --- a/src/multisearch.rs +++ b/src/multisearch.rs @@ -6,9 +6,7 @@ use sourmash::signature::SigsTrait; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use crate::utils::{ - csvwriter_thread, load_collection, load_sketches, MultiSearchResult, ReportType, -}; +use crate::utils::{csvwriter_thread, load_collection, MultiSearchResult, ReportType}; use sourmash::ani_utils::ani_from_containment; /// Search many queries against a list of signatures. @@ -26,14 +24,14 @@ pub fn multisearch( output: Option, ) -> Result<(), Box> { // Load all queries into memory at once. - let query_collection = load_collection( &query_filepath, selection, ReportType::Query, allow_failed_sigpaths, )?; - let queries = load_sketches(query_collection, selection, ReportType::Query).unwrap(); + + let queries = query_collection.load_sketches(selection)?; // Load all against sketches into memory at once. let against_collection = load_collection( @@ -42,7 +40,8 @@ pub fn multisearch( ReportType::Against, allow_failed_sigpaths, )?; - let against = load_sketches(against_collection, selection, ReportType::Against).unwrap(); + + let against = against_collection.load_sketches(selection)?; // set up a multi-producer, single-consumer channel. let (send, recv) = diff --git a/src/pairwise.rs b/src/pairwise.rs index 67acaafe..914c44f3 100644 --- a/src/pairwise.rs +++ b/src/pairwise.rs @@ -4,9 +4,7 @@ use rayon::prelude::*; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use crate::utils::{ - csvwriter_thread, load_collection, load_sketches, MultiSearchResult, ReportType, -}; +use crate::utils::{csvwriter_thread, load_collection, MultiSearchResult, ReportType}; use sourmash::ani_utils::ani_from_containment; use sourmash::selection::Selection; use sourmash::signature::SigsTrait; @@ -38,7 +36,8 @@ pub fn pairwise( &siglist ) } - let sketches = load_sketches(collection, selection, ReportType::General).unwrap(); + + let sketches = collection.load_sketches(selection)?; // set up a multi-producer, single-consumer channel. let (send, recv) = diff --git a/src/python/tests/conftest.py b/src/python/tests/conftest.py index 3f7021a1..f6f0f7f4 100644 --- a/src/python/tests/conftest.py +++ b/src/python/tests/conftest.py @@ -27,3 +27,11 @@ def zip_against(request): @pytest.fixture(params=[True, False]) def indexed(request): return request.param + +@pytest.fixture(params=[True, False]) +def indexed_query(request): + return request.param + +@pytest.fixture(params=[True, False]) +def indexed_against(request): + return request.param diff --git a/src/python/tests/sourmash_tst_utils.py b/src/python/tests/sourmash_tst_utils.py index f4ad4927..0c0e0e00 100644 --- a/src/python/tests/sourmash_tst_utils.py +++ b/src/python/tests/sourmash_tst_utils.py @@ -31,6 +31,15 @@ def zip_siglist(runtmp, siglist, db): return db +def index_siglist(runtmp, siglist, db, *, ksize=31, scaled=1000, moltype='DNA', + toggle_internal_storage='--internal-storage'): + # build index + runtmp.sourmash('scripts', 'index', siglist, + '-o', db, '-k', str(ksize), '--scaled', str(scaled), + '--moltype', moltype, toggle_internal_storage) + return db + + def scriptpath(scriptname='sourmash'): """Return the path to the scripts, in both dev and install situations.""" # note - it doesn't matter what the scriptname is here, as long as diff --git a/src/python/tests/test-data/2.fa.k21.sig.gz b/src/python/tests/test-data/2.fa.k21.sig.gz new file mode 100644 index 00000000..d63afbc3 Binary files /dev/null and b/src/python/tests/test-data/2.fa.k21.sig.gz differ diff --git a/src/python/tests/test-data/2.sig.zip b/src/python/tests/test-data/2.sig.zip new file mode 100644 index 00000000..28b41d59 Binary files /dev/null and b/src/python/tests/test-data/2.sig.zip differ diff --git a/src/python/tests/test-data/47.sig.zip b/src/python/tests/test-data/47.sig.zip new file mode 100644 index 00000000..1268e940 Binary files /dev/null and b/src/python/tests/test-data/47.sig.zip differ diff --git a/src/python/tests/test-data/63.sig.zip b/src/python/tests/test-data/63.sig.zip new file mode 100644 index 00000000..016d9218 Binary files /dev/null and b/src/python/tests/test-data/63.sig.zip differ diff --git a/src/python/tests/test_fastgather.py b/src/python/tests/test_fastgather.py index bd2ca5a4..f444818f 100644 --- a/src/python/tests/test_fastgather.py +++ b/src/python/tests/test_fastgather.py @@ -4,7 +4,8 @@ import sourmash from . import sourmash_tst_utils as utils -from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist) +from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist, + index_siglist) def test_installed(runtmp): @@ -14,7 +15,7 @@ def test_installed(runtmp): assert 'usage: fastgather' in runtmp.last_result.err -def test_simple(runtmp, zip_against): +def test_simple(runtmp, capfd, indexed_query, indexed_against, zip_against, toggle_internal_storage): # test basic execution! query = get_test_data('SRR606249.sig.gz') against_list = runtmp.output('against.txt') @@ -25,9 +26,17 @@ def test_simple(runtmp, zip_against): make_file_list(against_list, [sig2, sig47, sig63]) + if indexed_query: + query = index_siglist(runtmp, query, runtmp.output('query'), + scaled=100000) + if zip_against: against_list = zip_siglist(runtmp, against_list, runtmp.output('against.zip')) + if indexed_against: + against_list = index_siglist(runtmp, against_list, runtmp.output('db'), + toggle_internal_storage=toggle_internal_storage) + g_output = runtmp.output('gather.csv') p_output = runtmp.output('prefetch.csv') @@ -35,13 +44,22 @@ def test_simple(runtmp, zip_against): '-o', g_output, '-s', '100000') assert os.path.exists(g_output) + captured = capfd.readouterr() + print(captured.err) + df = pandas.read_csv(g_output) assert len(df) == 3 keys = set(df.keys()) assert {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'gather_result_rank', 'intersect_bp'}.issubset(keys) + # CTB note: we do not need to worry about this warning for query from a + # RocksDB, since there is only one. + if indexed_against: + print('indexed against:', indexed_against) + assert "WARNING: loading all sketches from a RocksDB into memory!" in captured.err + -def test_simple_with_prefetch(runtmp, zip_against): +def test_simple_with_prefetch(runtmp, zip_against, indexed, toggle_internal_storage): # test basic execution! query = get_test_data('SRR606249.sig.gz') against_list = runtmp.output('against.txt') @@ -55,6 +73,41 @@ def test_simple_with_prefetch(runtmp, zip_against): if zip_against: against_list = zip_siglist(runtmp, against_list, runtmp.output('against.zip')) + if indexed: + against_list = index_siglist(runtmp, against_list, runtmp.output('db'), + toggle_internal_storage=toggle_internal_storage) + + g_output = runtmp.output('gather.csv') + p_output = runtmp.output('prefetch.csv') + + runtmp.sourmash('scripts', 'fastgather', query, against_list, + '-o', g_output, '--output-prefetch', p_output, + '-s', '100000') + assert os.path.exists(g_output) + assert os.path.exists(p_output) + + df = pandas.read_csv(g_output) + assert len(df) == 3 + keys = set(df.keys()) + assert {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'gather_result_rank', 'intersect_bp'}.issubset(keys) + + df = pandas.read_csv(p_output) + assert len(df) == 3 + keys = set(df.keys()) + assert keys == {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'intersect_bp'} + + +def test_simple_with_prefetch_list_of_zips(runtmp): + # test basic execution! + query = get_test_data('SRR606249.sig.gz') + against_list = runtmp.output('against.txt') + + sig2 = get_test_data('2.sig.zip') + sig47 = get_test_data('47.sig.zip') + sig63 = get_test_data('63.sig.zip') + + make_file_list(against_list, [sig2, sig47, sig63]) + g_output = runtmp.output('gather.csv') p_output = runtmp.output('prefetch.csv') @@ -257,6 +310,7 @@ def test_bad_against_3(runtmp, capfd): assert 'InvalidArchive' in captured.err +@pytest.mark.xfail(reason="should work, bug") def test_against_multisigfile(runtmp, zip_against): # test against a sigfile that contains multiple sketches query = get_test_data('SRR606249.sig.gz') @@ -280,13 +334,8 @@ def test_against_multisigfile(runtmp, zip_against): '-o', g_output, '--output-prefetch', p_output, '-s', '100000') df = pandas.read_csv(g_output) - if zip_against: - assert len(df) == 3 - print(df) - else: - print(df) - assert len(df) == 1 - # @CTB this is a bug :(. It should load multiple sketches properly! + assert len(df) == 3 + print(df) def test_query_multisigfile(runtmp, capfd, zip_against): @@ -604,7 +653,7 @@ def test_simple_hp(runtmp): def test_indexed_against(runtmp, capfd): - # do not accept rocksdb for now + # accept rocksdb against, but with a warning query = get_test_data('SRR606249.sig.gz') against_list = runtmp.output('against.txt') @@ -621,15 +670,17 @@ def test_indexed_against(runtmp, capfd): g_output = runtmp.output('gather.csv') p_output = runtmp.output('prefetch.csv') - with pytest.raises(utils.SourmashCommandFailed): - runtmp.sourmash('scripts', 'fastgather', query, db_against, - '-o', g_output, '--output-prefetch', p_output, - '-s', '100000') + runtmp.sourmash('scripts', 'fastgather', query, db_against, + '-o', g_output, '--output-prefetch', p_output, + '-s', '100000') + + df = pandas.read_csv(g_output) + assert len(df) == 1 captured = capfd.readouterr() print(captured.err) - assert "Cannot load search signatures from a 'rocksdb' database. Please use sig, zip, or pathlist." in captured.err + assert "WARNING: loading all sketches from a RocksDB into memory!" in captured.err def test_simple_with_manifest_loading(runtmp): diff --git a/src/python/tests/test_fastmultigather.py b/src/python/tests/test_fastmultigather.py index 653b0ba5..643799b9 100644 --- a/src/python/tests/test_fastmultigather.py +++ b/src/python/tests/test_fastmultigather.py @@ -8,16 +8,8 @@ import sourmash from . import sourmash_tst_utils as utils -from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist) - - -def index_siglist(runtmp, siglist, db, *, ksize=31, scaled=1000, moltype='DNA', - toggle_internal_storage='--internal-storage'): - # build index - runtmp.sourmash('scripts', 'index', siglist, - '-o', db, '-k', str(ksize), '--scaled', str(scaled), - '--moltype', moltype, toggle_internal_storage) - return db +from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist, + index_siglist) def test_installed(runtmp): @@ -67,6 +59,47 @@ def test_simple(runtmp, zip_against): assert {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'intersect_bp', 'gather_result_rank'}.issubset(keys) +def test_simple_list_of_zips(runtmp): + # test basic execution! + query = get_test_data('SRR606249.sig.gz') + sig2 = get_test_data('2.sig.zip') + sig47 = get_test_data('47.sig.zip') + sig63 = get_test_data('63.sig.zip') + + query_list = runtmp.output('query.txt') + against_list = runtmp.output('against.txt') + + make_file_list(query_list, [query]) + make_file_list(against_list, [sig2, sig47, sig63]) + + cwd = os.getcwd() + try: + os.chdir(runtmp.output('')) + runtmp.sourmash('scripts', 'fastmultigather', query_list, against_list, + '-s', '100000', '-t', '0') + finally: + os.chdir(cwd) + + print(os.listdir(runtmp.output(''))) + + g_output = runtmp.output('SRR606249.gather.csv') + p_output = runtmp.output('SRR606249.prefetch.csv') + assert os.path.exists(p_output) + + # check prefetch output (only non-indexed gather) + df = pandas.read_csv(p_output) + assert len(df) == 3 + keys = set(df.keys()) + assert keys == {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'intersect_bp'} + + assert os.path.exists(g_output) + df = pandas.read_csv(g_output) + print(df) + assert len(df) == 3 + keys = set(df.keys()) + assert {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'intersect_bp', 'gather_result_rank'}.issubset(keys) + + def test_simple_space_in_signame(runtmp): # test basic execution! query = get_test_data('SRR606249.sig.gz') @@ -1141,7 +1174,6 @@ def test_rocksdb_no_internal_storage_gather_fails(runtmp, capfd): "47.fa.sig.gz", "63.fa.sig.gz"]) - # index! runtmp.sourmash('scripts', 'index', against_list, '--no-internal-storage', '-o', 'subdir/against.rocksdb') diff --git a/src/python/tests/test_index.py b/src/python/tests/test_index.py index 140fe799..105c1cb2 100644 --- a/src/python/tests/test_index.py +++ b/src/python/tests/test_index.py @@ -35,6 +35,81 @@ def test_index(runtmp, toggle_internal_storage): assert 'index is done' in runtmp.last_result.err +def test_index_warning_message(runtmp, capfd): + # test basic index when it has to load things into memory - see #451. + siglist = runtmp.output('db-sigs.txt') + + # note: can't use zip w/o breaking index. See sourmash-bio/sourmash#3321. + sig2 = get_test_data('2.sig.zip') + sig47 = get_test_data('47.fa.sig.gz') + sig63 = get_test_data('63.fa.sig.gz') + + make_file_list(siglist, [sig2, sig47, sig63]) + + output = runtmp.output('db.rocksdb') + + runtmp.sourmash('scripts', 'index', siglist, '-o', output) + assert os.path.exists(output) + print(runtmp.last_result.err) + + assert 'index is done' in runtmp.last_result.err + captured = capfd.readouterr() + print(captured.err) + assert "WARNING: loading all sketches into memory in order to index." in captured.err + + +def test_index_error_message(runtmp, capfd): + # test basic index when it errors out b/c can't load + siglist = runtmp.output('db-sigs.txt') + + # note: can't use zip w/o breaking index. See sourmash-bio/sourmash#3321. + sig2 = get_test_data('2.sig.zip') + sig47 = get_test_data('47.fa.sig.gz') + sig63 = get_test_data('63.fa.sig.gz') + + make_file_list(siglist, [sig2, sig47, sig63]) + + output = runtmp.output('db.rocksdb') + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'index', siglist, '-o', output, + '--no-internal-storage') + + captured = capfd.readouterr() + print(captured.err) + assert "cannot index this type of collection with external storage" in captured.err + + +def test_index_recursive(runtmp, capfd): + # test index of pathlist containing standalone manifest containing zip. + # a little ridiculous, but should hit the various branches in + # MultiCollection::load + siglist = runtmp.output('db-sigs.txt') + + # our basic list of sketches... + sig2_zip = get_test_data('2.sig.zip') + sig47 = get_test_data('47.fa.sig.gz') + sig63 = get_test_data('63.fa.sig.gz') + + # generate a standalone mf containing a sip + standalone_mf = runtmp.output('stand-mf.csv') + runtmp.sourmash('sig', 'collect', '-F', 'csv', '-o', standalone_mf, + sig2_zip) + + # now make a file list containing that mf + make_file_list(siglist, [standalone_mf, sig47, sig63]) + + output = runtmp.output('db.rocksdb') + + runtmp.sourmash('scripts', 'index', siglist, '-o', output) + + captured = capfd.readouterr() + print(captured.err) + assert "WARNING: loading all sketches into memory in order to index." in captured.err + assert 'index is done' in runtmp.last_result.err + assert 'Indexing 3 sketches.' in captured.err + + def test_index_protein(runtmp, toggle_internal_storage): sigs = get_test_data('protein.zip') output = runtmp.output('db.rocksdb') @@ -82,7 +157,7 @@ def test_index_missing_siglist(runtmp, capfd, toggle_internal_storage): captured = capfd.readouterr() print(captured.err) - assert 'Error: No such file or directory' in captured.err + assert 'Error: No such file or directory: ' in captured.err def test_index_sig(runtmp, capfd, toggle_internal_storage): diff --git a/src/python/tests/test_manysearch.py b/src/python/tests/test_manysearch.py index ab0f5762..4750d9d6 100644 --- a/src/python/tests/test_manysearch.py +++ b/src/python/tests/test_manysearch.py @@ -4,7 +4,8 @@ import sourmash from . import sourmash_tst_utils as utils -from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist) +from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist, + index_siglist) def test_installed(runtmp): @@ -14,13 +15,6 @@ def test_installed(runtmp): assert 'usage: manysearch' in runtmp.last_result.err -def index_siglist(runtmp, siglist, db, ksize=31, scaled=1000, moltype='DNA'): - # build index - runtmp.sourmash('scripts', 'index', siglist, - '-o', db, '-k', str(ksize), '--scaled', str(scaled), - '--moltype', moltype) - return db - def test_simple(runtmp, zip_query, zip_against): # test basic execution! query_list = runtmp.output('query.txt') @@ -176,7 +170,7 @@ def test_simple_abund(runtmp): assert total_weighted_hashes == 73489 -def test_simple_indexed(runtmp, zip_query): +def test_simple_indexed(runtmp, zip_query, indexed_query): # test basic execution! query_list = runtmp.output('query.txt') against_list = runtmp.output('against.txt') @@ -188,12 +182,67 @@ def test_simple_indexed(runtmp, zip_query): make_file_list(query_list, [sig2, sig47, sig63]) make_file_list(against_list, [sig2, sig47, sig63]) + if zip_query: + query_list = zip_siglist(runtmp, query_list, runtmp.output('query.zip')) + + if indexed_query: + query_list = index_siglist(runtmp, query_list, runtmp.output('query_db')) + output = runtmp.output('out.csv') against_list = index_siglist(runtmp, against_list, runtmp.output('db')) - if zip_query: - query_list = zip_siglist(runtmp, query_list, runtmp.output('query.zip')) + print('query_list is:', query_list) + runtmp.sourmash('scripts', 'manysearch', query_list, against_list, + '-o', output, '-t', '0.01') + assert os.path.exists(output) + + df = pandas.read_csv(output) + assert len(df) == 5 + + dd = df.to_dict(orient='index') + print(dd) + + for idx, row in dd.items(): + # identical? + if row['match_name'] == row['query_name']: + assert float(row['containment'] == 1.0) + assert float(row['query_containment_ani'] == 1.0) + else: + # confirm hand-checked numbers + q = row['query_name'].split()[0] + m = row['match_name'].split()[0] + cont = float(row['containment']) + intersect_hashes = int(row['intersect_hashes']) + query_ani = float(row['query_containment_ani']) + cont = round(cont, 4) + query_ani = round(query_ani, 4) + print(q, m, f"{cont:.04}", f"{query_ani:.04}") + + if q == 'NC_011665.1' and m == 'NC_009661.1': + assert cont == 0.4828 + assert intersect_hashes == 2529 + assert query_ani == 0.9768 + + if q == 'NC_009661.1' and m == 'NC_011665.1': + assert cont == 0.4885 + assert intersect_hashes == 2529 + assert query_ani == 0.9772 + + +def test_simple_list_of_zips(runtmp): + # test basic execution! + query_list = runtmp.output('query.txt') + against_list = runtmp.output('against.txt') + + sig2 = get_test_data('2.sig.zip') + sig47 = get_test_data('47.sig.zip') + sig63 = get_test_data('63.sig.zip') + + make_file_list(query_list, [sig2, sig47, sig63]) + make_file_list(against_list, [sig2, sig47, sig63]) + + output = runtmp.output('out.csv') runtmp.sourmash('scripts', 'manysearch', query_list, against_list, '-o', output, '-t', '0.01') diff --git a/src/python/tests/test_multisearch.py b/src/python/tests/test_multisearch.py index 87553615..8763c688 100644 --- a/src/python/tests/test_multisearch.py +++ b/src/python/tests/test_multisearch.py @@ -5,7 +5,8 @@ import sourmash from . import sourmash_tst_utils as utils -from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist) +from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist, + index_siglist) def test_installed(runtmp): @@ -83,7 +84,7 @@ def test_simple_no_ani(runtmp, zip_query, zip_db): assert intersect_hashes == 2529 -def test_simple_ani(runtmp, zip_query, zip_db): +def test_simple_ani(runtmp, zip_query, zip_db, indexed_query, indexed_against): # test basic execution! query_list = runtmp.output('query.txt') against_list = runtmp.output('against.txt') @@ -99,9 +100,96 @@ def test_simple_ani(runtmp, zip_query, zip_db): if zip_db: against_list = zip_siglist(runtmp, against_list, runtmp.output('db.zip')) + if zip_query: query_list = zip_siglist(runtmp, query_list, runtmp.output('query.zip')) + if indexed_query: + query_list = index_siglist(runtmp, query_list, runtmp.output('q_db')) + + if indexed_against: + against_list = index_siglist(runtmp, against_list, runtmp.output('db')) + + runtmp.sourmash('scripts', 'multisearch', query_list, against_list, + '-o', output, '--ani') + assert os.path.exists(output) + + df = pandas.read_csv(output) + assert len(df) == 5 + + dd = df.to_dict(orient='index') + print(dd) + + for idx, row in dd.items(): + # identical? + if row['match_name'] == row['query_name']: + assert row['query_md5'] == row['match_md5'], row + assert float(row['containment'] == 1.0) + assert float(row['jaccard'] == 1.0) + assert float(row['max_containment'] == 1.0) + assert float(row['query_containment_ani'] == 1.0) + assert float(row['match_containment_ani'] == 1.0) + assert float(row['average_containment_ani'] == 1.0) + assert float(row['max_containment_ani'] == 1.0) + + else: + # confirm hand-checked numbers + q = row['query_name'].split()[0] + m = row['match_name'].split()[0] + cont = float(row['containment']) + jaccard = float(row['jaccard']) + maxcont = float(row['max_containment']) + intersect_hashes = int(row['intersect_hashes']) + q1_ani = float(row['query_containment_ani']) + q2_ani = float(row['match_containment_ani']) + avg_ani = float(row['average_containment_ani']) + max_ani = float(row['max_containment_ani']) + + + jaccard = round(jaccard, 4) + cont = round(cont, 4) + maxcont = round(maxcont, 4) + q1_ani = round(q1_ani, 4) + q2_ani = round(q2_ani, 4) + avg_ani = round(avg_ani, 4) + max_ani = round(max_ani, 4) + print(q, m, f"{jaccard:.04}", f"{cont:.04}", f"{maxcont:.04}", f"{q1_ani:.04}", f"{q2_ani:.04}", f"{avg_ani:.04}", f"{max_ani:.04}") + + if q == 'NC_011665.1' and m == 'NC_009661.1': + assert jaccard == 0.3207 + assert cont == 0.4828 + assert maxcont == 0.4885 + assert intersect_hashes == 2529 + assert q1_ani == 0.9768 + assert q2_ani == 0.9772 + assert avg_ani == 0.977 + assert max_ani == 0.9772 + + if q == 'NC_009661.1' and m == 'NC_011665.1': + assert jaccard == 0.3207 + assert cont == 0.4885 + assert maxcont == 0.4885 + assert intersect_hashes == 2529 + assert q1_ani == 0.9772 + assert q2_ani == 0.9768 + assert avg_ani == 0.977 + assert max_ani == 0.9772 + + +def test_simple_ani_list_of_zips(runtmp): + # test basic execution against a pathlist file of zips + query_list = runtmp.output('query.txt') + against_list = runtmp.output('against.txt') + + sig2 = get_test_data('2.sig.zip') + sig47 = get_test_data('47.sig.zip') + sig63 = get_test_data('63.sig.zip') + + make_file_list(query_list, [sig2, sig47, sig63]) + make_file_list(against_list, [sig2, sig47, sig63]) + + output = runtmp.output('out.csv') + runtmp.sourmash('scripts', 'multisearch', query_list, against_list, '-o', output, '--ani') assert os.path.exists(output) @@ -168,6 +256,82 @@ def test_simple_ani(runtmp, zip_query, zip_db): assert max_ani == 0.9772 +def test_simple_ani_list_of_csv(runtmp): + # test basic execution against a pathlist file of manifests + query_list = runtmp.output('query.txt') + against_list = runtmp.output('against.txt') + + sig2 = get_test_data('2.sig.zip') + sig47 = get_test_data('47.sig.zip') + sig63 = get_test_data('63.sig.zip') + + runtmp.sourmash('sig', 'collect', sig2, '-o', 'sig2.mf.csv', '-F', 'csv') + runtmp.sourmash('sig', 'collect', sig47, '-o', 'sig47.mf.csv', '-F', 'csv') + runtmp.sourmash('sig', 'collect', sig63, '-o', 'sig63.mf.csv', '-F', 'csv') + + make_file_list(query_list, ['sig2.mf.csv', 'sig47.mf.csv', 'sig63.mf.csv']) + make_file_list(against_list, ['sig2.mf.csv', 'sig47.mf.csv', 'sig63.mf.csv']) + + output = runtmp.output('out.csv') + + runtmp.sourmash('scripts', 'multisearch', query_list, against_list, + '-o', output, '--ani') + assert os.path.exists(output) + + df = pandas.read_csv(output) + assert len(df) == 5 + + dd = df.to_dict(orient='index') + print(dd) + + +def test_simple_ani_standalone_manifest(runtmp): + # test basic execution of a standalone manifest + against_list = runtmp.output('against.sig.zip') + + sig2 = get_test_data('2.sig.zip') + sig47 = get_test_data('47.sig.zip') + sig63 = get_test_data('63.sig.zip') + + runtmp.sourmash('sig', 'cat', sig2, sig47, sig63, '-o', against_list) + + picklist_file = runtmp.output('pl.csv') + with open(picklist_file, 'w', newline='') as fp: + w = csv.writer(fp) + w.writerow(['ident']) + w.writerow(['CP001071.1']) + + # use picklist to create a standalone manifest + query_csv = runtmp.output('select.mf.csv') + runtmp.sourmash('sig', 'check', '--picklist', + f'{picklist_file}:ident:ident', + '-m', query_csv, against_list) + + output = runtmp.output('out.csv') + + runtmp.sourmash('scripts', 'multisearch', query_csv, against_list, + '-o', output, '--ani') + assert os.path.exists(output) + + df = pandas.read_csv(output) + assert len(df) == 1 # should only be the one, identical match. + + dd = df.to_dict(orient='index') + print(dd) + + for idx, row in dd.items(): + # identical? + if row['match_name'] == row['query_name']: + assert row['query_md5'] == row['match_md5'], row + assert float(row['containment'] == 1.0) + assert float(row['jaccard'] == 1.0) + assert float(row['max_containment'] == 1.0) + assert float(row['query_containment_ani'] == 1.0) + assert float(row['match_containment_ani'] == 1.0) + assert float(row['average_containment_ani'] == 1.0) + assert float(row['max_containment_ani'] == 1.0) + + def test_simple_threshold(runtmp, zip_query, zip_db): # test with a simple threshold => only 3 results query_list = runtmp.output('query.txt') @@ -223,6 +387,44 @@ def test_simple_manifest(runtmp): assert len(df) == 3 +def test_lists_of_standalone_manifests(runtmp, capfd): + # test pathlists of manifests + query_list = runtmp.output('query.txt') + against_list = runtmp.output('against.txt') + + sig2 = get_test_data('2.fa.sig.gz') + sig47 = get_test_data('47.fa.sig.gz') + sig63 = get_test_data('63.fa.sig.gz') + + sig2_mf = runtmp.output('2.mf.csv') + runtmp.sourmash('sig', 'collect', sig2, '-o', sig2_mf, '-F', 'csv') + sig47_mf = runtmp.output('47.mf.csv') + runtmp.sourmash('sig', 'collect', sig47, '-o', sig47_mf, '-F', 'csv') + sig63_mf = runtmp.output('63.mf.csv') + runtmp.sourmash('sig', 'collect', sig63, '-o', sig63_mf, '-F', 'csv') + + make_file_list(query_list, [sig2_mf, sig47_mf, sig63_mf]) + make_file_list(against_list, [sig2, sig47, sig63]) + + query_mf = runtmp.output('qmf.csv') + against_mf = runtmp.output('amf.csv') + + runtmp.sourmash("sig", "manifest", query_list, "-o", query_mf) + runtmp.sourmash("sig", "manifest", against_list, "-o", against_mf) + + output = runtmp.output('out.csv') + + runtmp.sourmash('scripts', 'multisearch', query_mf, against_mf, + '-o', output, '-t', '0.5') + assert os.path.exists(output) + + df = pandas.read_csv(output) + assert len(df) == 3 + + captured = capfd.readouterr() + print(captured.err) + + def test_missing_query(runtmp, capfd, zip_query): # test with a missing query list query_list = runtmp.output('query.txt') @@ -420,10 +622,9 @@ def test_empty_query(runtmp, capfd): captured = capfd.readouterr() print(captured.err) assert "No query signatures loaded, exiting." in captured.err - # @CTB -def test_nomatch_query(runtmp, capfd, zip_query): +def test_nomatch_query_warn(runtmp, capfd, zip_query): # test a non-matching (diff ksize) in query; do we get warning message? query_list = runtmp.output('query.txt') against_list = runtmp.output('against.txt') @@ -451,6 +652,64 @@ def test_nomatch_query(runtmp, capfd, zip_query): assert 'WARNING: skipped 1 query paths - no compatible signatures' in captured.err +def test_nomatch_query_exit(runtmp, capfd, zip_query): + # test loading no matching sketches - do we error exit appropriately? + query_list = runtmp.output('query.txt') + against_list = runtmp.output('against.txt') + + sig1 = get_test_data('1.fa.k21.sig.gz') + sig2 = get_test_data('2.fa.sig.gz') + sig47 = get_test_data('47.fa.sig.gz') + sig63 = get_test_data('63.fa.sig.gz') + + make_file_list(query_list, [sig1]) + make_file_list(against_list, [sig2, sig47, sig63]) + + output = runtmp.output('out.csv') + + if zip_query: + query_list = zip_siglist(runtmp, query_list, runtmp.output('query.zip')) + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'multisearch', query_list, against_list, + '-o', output) + + captured = capfd.readouterr() + print(captured.err) + + assert 'WARNING: skipped 1 query paths - no compatible signatures' in captured.err + assert 'No query signatures loaded, exiting' in captured.err + + +def test_nomatch_against(runtmp, capfd, zip_query): + # test a non-matching (diff ksize) in against; do we get warning message? + query_list = runtmp.output('query.txt') + against_list = runtmp.output('against.txt') + + sig1 = get_test_data('1.fa.k21.sig.gz') + sig2 = get_test_data('2.fa.sig.gz') + sig47 = get_test_data('47.fa.sig.gz') + sig63 = get_test_data('63.fa.sig.gz') + + make_file_list(query_list, [sig2, sig47, sig63, sig1]) + make_file_list(against_list, [sig2, sig47, sig63]) + + output = runtmp.output('out.csv') + + if zip_query: + query_list = zip_siglist(runtmp, query_list, runtmp.output('query.zip')) + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'multisearch', query_list, against_list, + '-o', output, '-k', '21') + + captured = capfd.readouterr() + print(captured.err) + + assert 'WARNING: skipped 3 search paths - no compatible signatures' in captured.err + assert 'No search signatures loaded, exiting' in captured.err + + def test_load_only_one_bug(runtmp, capfd, zip_db): # check that we behave properly when presented with multiple against # sketches diff --git a/src/python/tests/test_pairwise.py b/src/python/tests/test_pairwise.py index c8264069..cba2a297 100644 --- a/src/python/tests/test_pairwise.py +++ b/src/python/tests/test_pairwise.py @@ -5,7 +5,8 @@ import sourmash from . import sourmash_tst_utils as utils -from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist) +from .sourmash_tst_utils import (get_test_data, make_file_list, zip_siglist, + index_siglist) def test_installed(runtmp): @@ -15,7 +16,7 @@ def test_installed(runtmp): assert 'usage: pairwise' in runtmp.last_result.err -def test_simple_no_ani(runtmp, zip_query): +def test_simple_no_ani(runtmp, capfd, zip_query, indexed): # test basic execution! query_list = runtmp.output('query.txt') @@ -30,6 +31,9 @@ def test_simple_no_ani(runtmp, zip_query): if zip_query: query_list = zip_siglist(runtmp, query_list, runtmp.output('query.zip')) + if indexed: + query_list = index_siglist(runtmp, query_list, runtmp.output('db')) + runtmp.sourmash('scripts', 'pairwise', query_list, '-o', output, '-t', '-1') assert os.path.exists(output) @@ -64,6 +68,12 @@ def test_simple_no_ani(runtmp, zip_query): assert maxcont == 0.4885 assert intersect_hashes == 2529 + captured = capfd.readouterr() + print(captured.err) + + if indexed: + assert "WARNING: loading all sketches from a RocksDB into memory!" in captured.err + def test_simple_ani(runtmp, zip_query): # test basic execution! @@ -251,7 +261,7 @@ def test_missing_query(runtmp, capfd, zip_db): -def test_empty_query(runtmp): +def test_empty_query(runtmp, capfd): # test with an empty query list query_list = runtmp.output('query.txt') @@ -267,11 +277,11 @@ def test_empty_query(runtmp): runtmp.sourmash('scripts', 'pairwise', query_list, '-o', output) - print(runtmp.last_result.err) - # @CTB + captured = capfd.readouterr() + assert 'Error: No analysis signatures loaded, exiting.' in captured.err -def test_nomatch_query(runtmp, capfd, zip_query): +def test_nomatch_query_warn(runtmp, capfd, zip_query): # test a non-matching (diff ksize) in query; do we get warning message? query_list = runtmp.output('query.txt') @@ -297,6 +307,31 @@ def test_nomatch_query(runtmp, capfd, zip_query): assert 'WARNING: skipped 1 analysis paths - no compatible signatures' in captured.err +def test_nomatch_query_exit(runtmp, capfd, zip_query): + # test a non-matching (diff ksize) in query; do we get warning message? + query_list = runtmp.output('query.txt') + + sig1 = get_test_data('1.fa.k21.sig.gz') + sig2 = get_test_data('2.fa.k21.sig.gz') + + make_file_list(query_list, [sig1, sig2]) + + output = runtmp.output('out.csv') + + if zip_query: + query_list = zip_siglist(runtmp, query_list, runtmp.output('query.zip')) + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'pairwise', query_list, + '-o', output) + + captured = capfd.readouterr() + print(captured.err) + + assert 'WARNING: skipped 2 analysis paths - no compatible signatures' in captured.err + assert 'Error: No analysis signatures loaded, exiting.' in captured.err + + def test_load_only_one_bug(runtmp, capfd, zip_db): # check that we behave properly when presented with multiple query # sketches diff --git a/src/utils.rs b/src/utils/mod.rs similarity index 86% rename from src/utils.rs rename to src/utils/mod.rs index 0b7df6c9..b2e36db6 100644 --- a/src/utils.rs +++ b/src/utils/mod.rs @@ -1,9 +1,10 @@ -/// Utility functions for sourmash_plugin_branchwater. +//! Utility functions for `sourmash_plugin_branchwater`. use rayon::prelude::*; + use sourmash::encodings::HashFunctions; use sourmash::selection::Select; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use camino::Utf8Path as Path; use camino::Utf8PathBuf as PathBuf; use csv::Writer; @@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize}; use std::cmp::{Ordering, PartialOrd}; use std::collections::BinaryHeap; use std::fs::{create_dir_all, File}; -use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::io::{BufWriter, Write}; use std::panic; use std::sync::atomic; use std::sync::atomic::AtomicUsize; @@ -20,25 +21,19 @@ use zip::write::{ExtendedFileOptions, FileOptions, ZipWriter}; use zip::CompressionMethod; use sourmash::ani_utils::{ani_ci_from_containment, ani_from_containment}; -use sourmash::collection::Collection; use sourmash::manifest::{Manifest, Record}; use sourmash::selection::Selection; use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::minhash::KmerMinHash; -use sourmash::storage::{FSStorage, InnerStorage, SigStore}; +use sourmash::storage::SigStore; use stats::{median, stddev}; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; -/// Track a name/minhash. -pub struct SmallSignature { - pub location: String, - pub name: String, - pub md5sum: String, - pub minhash: KmerMinHash, -} -/// Structure to hold overlap information from comparisons. +pub mod multicollection; +use multicollection::MultiCollection; +/// Structure to hold overlap information from comparisons. pub struct PrefetchResult { pub name: String, pub md5sum: String, @@ -95,7 +90,9 @@ pub fn prefetch( /// Write list of prefetch matches. pub fn write_prefetch( - query: &SigStore, + query_filename: String, + query_name: String, + query_md5: String, prefetch_output: Option, matchlist: &BinaryHeap, ) -> Result<(), Box> { @@ -125,12 +122,7 @@ pub fn write_prefetch( writeln!( &mut writer, "{},\"{}\",{},\"{}\",{},{}", - query.filename(), - query.name(), - query.md5sum(), - m.name, - m.md5sum, - m.overlap + query_filename, query_name, query_md5, m.name, m.md5sum, m.overlap ) .ok(); } @@ -433,49 +425,28 @@ fn process_prefix_csv( Ok((results, n_fastas)) } -// Load all compatible minhashes from a collection into memory -// also store sig name and md5 alongside, as we usually need those -pub fn load_sketches( - collection: Collection, - selection: &Selection, - _report_type: ReportType, -) -> Result> { - let sketchinfo: Vec = collection - .par_iter() - .filter_map(|(_idx, record)| { - let sig = collection.sig_from_record(record).ok()?; - let selected_sig = sig.clone().select(selection).ok()?; - let minhash = selected_sig.minhash()?.clone(); - - Some(SmallSignature { - location: record.internal_location().to_string(), - name: sig.name(), - md5sum: sig.md5sum(), - minhash, - }) - }) - .collect(); - - Ok(sketchinfo) -} +///////// /// Load a collection of sketches from a file, filtering to keep only /// those with a minimum overlap. pub fn load_sketches_above_threshold( - against_collection: Collection, + against_collection: MultiCollection, query: &KmerMinHash, threshold_hashes: u64, ) -> Result<(BinaryHeap, usize, usize)> { let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); + if against_collection.contains_revindex { + eprintln!("WARNING: loading all sketches from a RocksDB into memory!"); + } let matchlist: BinaryHeap = against_collection .par_iter() - .filter_map(|(_idx, against_record)| { + .filter_map(|(coll, _idx, against_record)| { let mut results = Vec::new(); // Load against into memory - if let Ok(against_sig) = against_collection.sig_from_record(against_record) { + if let Ok(against_sig) = coll.sig_from_record(against_record) { let against_filename = against_sig.filename(); let against_mh: KmerMinHash = against_sig.try_into().expect("cannot get sketch"); let against_md5 = against_mh.md5sum(); // keep original md5sum @@ -543,148 +514,25 @@ impl std::fmt::Display for ReportType { } } -pub fn collection_from_zipfile(sigpath: &Path, report_type: &ReportType) -> Result { - match Collection::from_zipfile(sigpath) { - Ok(collection) => Ok(collection), - Err(_) => bail!("failed to load {} zipfile: '{}'", report_type, sigpath), - } -} - -fn collection_from_manifest( - sigpath: &Path, - report_type: &ReportType, -) -> Result { - let file = File::open(sigpath) - .with_context(|| format!("Failed to open {} file: '{}'", report_type, sigpath))?; - - let reader = BufReader::new(file); - let manifest = Manifest::from_reader(reader).with_context(|| { - format!( - "Failed to read {} manifest from: '{}'", - report_type, sigpath - ) - })?; - - if manifest.is_empty() { - // If the manifest is empty, return an error constructed with the anyhow! macro - Err(anyhow!("could not read as manifest: '{}'", sigpath)) - } else { - // If the manifest is not empty, proceed to create and return the Collection - Ok(Collection::new( - manifest, - InnerStorage::new( - FSStorage::builder() - .fullpath("".into()) - .subdir("".into()) - .build(), - ), - )) - } -} - -fn collection_from_pathlist( - sigpath: &Path, - report_type: &ReportType, -) -> Result<(Collection, usize), anyhow::Error> { - let file = File::open(sigpath).with_context(|| { - format!( - "Failed to open {} pathlist file: '{}'", - report_type, sigpath - ) - })?; - let reader = BufReader::new(file); - - // load list of paths - let lines: Vec<_> = reader - .lines() - .filter_map(|line| match line { - Ok(path) => Some(path), - Err(_err) => None, - }) - .collect(); - - // load sketches from paths in parallel. - let n_failed = AtomicUsize::new(0); - let records: Vec = lines - .par_iter() - .filter_map(|path| match Signature::from_path(path) { - Ok(signatures) => { - let recs: Vec = signatures - .into_iter() - .flat_map(|v| Record::from_sig(&v, path)) - .collect(); - Some(recs) - } - Err(err) => { - eprintln!("Sketch loading error: {}", err); - eprintln!("WARNING: could not load sketches from path '{}'", path); - let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); - None - } - }) - .flatten() - .collect(); - - if records.is_empty() { - eprintln!( - "No valid signatures found in {} pathlist '{}'", - report_type, sigpath - ); - } - - let manifest: Manifest = records.into(); - let collection = Collection::new( - manifest, - InnerStorage::new( - FSStorage::builder() - .fullpath("".into()) - .subdir("".into()) - .build(), - ), - ); - let n_failed = n_failed.load(atomic::Ordering::SeqCst); - - Ok((collection, n_failed)) -} - -fn collection_from_signature(sigpath: &Path, report_type: &ReportType) -> Result { - let signatures = Signature::from_path(sigpath).with_context(|| { - format!( - "Failed to load {} signatures from: '{}'", - report_type, sigpath - ) - })?; - - Collection::from_sigs(signatures).with_context(|| { - format!( - "Loaded {} signatures but failed to load as collection: '{}'", - report_type, sigpath - ) - }) -} +/// Load a multi collection from a path - this is the new top-level load function. pub fn load_collection( siglist: &String, selection: &Selection, report_type: ReportType, allow_failed: bool, -) -> Result { +) -> Result { let sigpath = PathBuf::from(siglist); if !sigpath.exists() { bail!("No such file or directory: '{}'", &sigpath); } - // disallow rocksdb input here - if is_revindex_database(&sigpath) { - bail!("Cannot load {} signatures from a 'rocksdb' database. Please use sig, zip, or pathlist.", report_type); - } - eprintln!("Reading {}(s) from: '{}'", report_type, &siglist); let mut last_error = None; let collection = if sigpath.extension().map_or(false, |ext| ext == "zip") { - match collection_from_zipfile(&sigpath, &report_type) { + match MultiCollection::from_zipfile(&sigpath) { Ok(coll) => Some((coll, 0)), Err(e) => { last_error = Some(e); @@ -695,32 +543,40 @@ pub fn load_collection( None }; - let collection = - collection.or_else(|| match collection_from_manifest(&sigpath, &report_type) { - Ok(coll) => Some((coll, 0)), - Err(e) => { - last_error = Some(e); - None - } - }); + let collection = collection.or_else(|| match MultiCollection::from_rocksdb(&sigpath) { + Ok(coll) => Some((coll, 0)), + Err(e) => { + last_error = Some(e); + None + } + }); let collection = - collection.or_else(|| match collection_from_signature(&sigpath, &report_type) { - Ok(coll) => Some((coll, 0)), - Err(e) => { - last_error = Some(e); - None - } - }); + collection.or_else( + || match MultiCollection::from_standalone_manifest(&sigpath) { + Ok(coll) => Some((coll, 0)), + Err(e) => { + last_error = Some(e); + None + } + }, + ); - let collection = - collection.or_else(|| match collection_from_pathlist(&sigpath, &report_type) { - Ok((coll, n_failed)) => Some((coll, n_failed)), - Err(e) => { - last_error = Some(e); - None - } - }); + let collection = collection.or_else(|| match MultiCollection::from_signature(&sigpath) { + Ok(coll) => Some((coll, 0)), + Err(e) => { + last_error = Some(e); + None + } + }); + + let collection = collection.or_else(|| match MultiCollection::from_pathlist(&sigpath) { + Ok((coll, n_failed)) => Some((coll, n_failed)), + Err(e) => { + last_error = Some(e); + None + } + }); match collection { Some((coll, n_failed)) => { @@ -772,7 +628,7 @@ pub fn load_collection( /// Returns an error if: /// * No signatures were successfully loaded. pub fn report_on_collection_loading( - collection: &Collection, + collection: &MultiCollection, skipped_paths: usize, failed_paths: usize, report_type: ReportType, @@ -972,7 +828,7 @@ pub fn consume_query_by_gather( } let query_md5sum: String = orig_query_mh.md5sum().clone(); let query_name = query.name().clone(); - let query_scaled = orig_query_mh.scaled().clone() as usize; //query_mh.scaled() as usize + let query_scaled = orig_query_mh.scaled() as usize; let mut query_mh = orig_query_mh.clone(); let mut orig_query_ds = orig_query_mh.clone().downsample_scaled(scaled)?; @@ -1043,11 +899,11 @@ pub fn consume_query_by_gather( query_filename: query.filename(), query_name: query_name.clone(), query_md5: query_md5sum.clone(), - query_bp: query_bp.clone(), + query_bp, ksize, moltype: query_moltype.clone(), - scaled: query_scaled.clone(), - query_n_hashes: query_n_hashes, + scaled: query_scaled, + query_n_hashes, query_abundance: query_mh.track_abundance(), query_containment_ani: match_.query_containment_ani, match_containment_ani: match_.match_containment_ani, diff --git a/src/utils/multicollection.rs b/src/utils/multicollection.rs new file mode 100644 index 00000000..b2ffe093 --- /dev/null +++ b/src/utils/multicollection.rs @@ -0,0 +1,431 @@ +//! MultiCollection implementation to handle sketches coming from multiple files. + +use rayon::prelude::*; +use sourmash::prelude::*; + +use anyhow::{anyhow, Context, Result}; +use camino::Utf8Path as Path; +use camino::Utf8PathBuf; +use log::{debug, trace}; +use std::collections::HashSet; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::sync::atomic; +use std::sync::atomic::AtomicUsize; + +use sourmash::collection::Collection; +use sourmash::encodings::Idx; +use sourmash::errors::SourmashError; +use sourmash::manifest::{Manifest, Record}; +use sourmash::selection::{Select, Selection}; +use sourmash::signature::Signature; +use sourmash::sketch::minhash::KmerMinHash; +use sourmash::storage::{FSStorage, InnerStorage, SigStore}; + +/// A collection of sketches, potentially stored in multiple files. +#[derive(Clone)] +pub struct MultiCollection { + collections: Vec, + pub contains_revindex: bool, // track whether one or more Collection is a RevIndex +} + +impl MultiCollection { + fn new(collections: Vec, contains_revindex: bool) -> Self { + Self { + collections, + contains_revindex, + } + } + + // Try loading a set of paths as JSON files only. Fails on any Err. + // + // This is a legacy method that supports pathlists for + // 'index'. See sourmash-bio/sourmash#3321 for background. + // + // Use load_set_of_paths for full generality! + // + // CTB NOTE: this could potentially have very poor performance if + // there are a lot of _good_ files, with one _bad_ one. Look into + // exiting first loop early. + fn load_set_of_json_files(paths: &HashSet) -> Result { + // load sketches from paths in parallel. + let n_failed = AtomicUsize::new(0); + let records: Vec = paths + .par_iter() + .filter_map(|path| match Signature::from_path(path) { + Ok(signatures) => { + let recs: Vec = signatures + .into_iter() + .flat_map(|v| Record::from_sig(&v, path)) + .collect(); + Some(recs) + } + Err(_) => { + let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); + None + } + }) + .flatten() + .collect(); + + let n_failed = n_failed.load(atomic::Ordering::SeqCst); + + if records.is_empty() || n_failed > 0 { + return Err(anyhow!("cannot load everything as JSON files")); + } + + let manifest: Manifest = records.into(); + let collection = Collection::new( + manifest, + InnerStorage::new( + FSStorage::builder() + .fullpath("".into()) + .subdir("".into()) + .build(), + ), + ); + Ok(MultiCollection::from(collection)) + } + + // Turn a set of paths into list of Collections - works recursively + // if needed, and can handle paths of any supported type. + fn load_set_of_paths(paths: &HashSet) -> (MultiCollection, usize) { + let n_failed = AtomicUsize::new(0); + + // could just use a variant of load_collection here? + let colls: Vec = paths + .par_iter() + .filter_map(|iloc| match iloc { + // load from zipfile + x if x.ends_with(".zip") => { + debug!("loading sigs from zipfile {}", x); + let coll = Collection::from_zipfile(x).expect("nothing to load!?"); + Some(MultiCollection::from(coll)) + } + // load from CSV + x if x.ends_with(".csv") => { + debug!("vec from pathlist of standalone manifests!"); + + let x: String = x.into(); + let utf_path: &Path = x.as_str().into(); + MultiCollection::from_standalone_manifest(utf_path).ok() + } + // load from (by default) a sigfile + _ => { + debug!("loading sigs from sigfile {}", iloc); + let signatures = match Signature::from_path(iloc) { + Ok(signatures) => Some(signatures), + Err(err) => { + eprintln!("Sketch loading error: {}", err); + None + } + }; + + match signatures { + Some(signatures) => { + let records: Vec<_> = signatures + .into_iter() + .flat_map(|v| Record::from_sig(&v, iloc)) + .collect(); + + let manifest: Manifest = records.into(); + let collection = Collection::new( + manifest, + InnerStorage::new( + FSStorage::builder() + .fullpath("".into()) + .subdir("".into()) + .build(), + ), + ); + Some(MultiCollection::from(collection)) + } + None => { + eprintln!("WARNING: could not load sketches from path '{}'", iloc); + let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); + None + } + } + } + }) + .collect(); + + let n_failed = n_failed.load(atomic::Ordering::SeqCst); + (MultiCollection::from(colls), n_failed) + } + + /// Build from a standalone manifest. Note: the tricky bit here + /// is that the manifest may select only a subset of the rows, + /// using (name, md5) tuples. + pub fn from_standalone_manifest(sigpath: &Path) -> Result { + debug!("multi from standalone manifest!"); + let file = + File::open(sigpath).with_context(|| format!("Failed to open file: '{}'", sigpath))?; + + let reader = BufReader::new(file); + let manifest = Manifest::from_reader(reader) + .with_context(|| format!("Failed to read manifest from: '{}'", sigpath))?; + debug!("got {} records from standalone manifest", manifest.len()); + + if manifest.is_empty() { + Err(anyhow!("could not read as manifest: '{}'", sigpath)) + } else { + let ilocs: HashSet<_> = manifest.internal_locations().map(String::from).collect(); + let (mut colls, _n_failed) = MultiCollection::load_set_of_paths(&ilocs); + + colls.intersect_manifest(&manifest); + + Ok(colls) + } + } + + /// Load a collection from a .zip file. + pub fn from_zipfile(sigpath: &Path) -> Result { + debug!("multi from zipfile!"); + match Collection::from_zipfile(sigpath) { + Ok(collection) => Ok(MultiCollection::new(vec![collection], false)), + Err(_) => bail!("failed to load zipfile: '{}'", sigpath), + } + } + + /// Load a collection from a RocksDB. + pub fn from_rocksdb(sigpath: &Path) -> Result { + debug!("multi from rocksdb!"); + // duplicate logic from is_revindex_database + let path: Utf8PathBuf = sigpath.into(); + + let mut is_rocksdb = false; + + if path.is_dir() { + let current_file = path.join("CURRENT"); + if current_file.exists() && current_file.is_file() { + is_rocksdb = true; + } + } + + if is_rocksdb { + match Collection::from_rocksdb(sigpath) { + Ok(collection) => { + debug!("...rocksdb successful!"); + Ok(MultiCollection::new(vec![collection], true)) + } + Err(_) => bail!("failed to load rocksdb: '{}'", sigpath), + } + } else { + bail!("not a rocksdb: '{}'", sigpath) + } + } + + /// Load a collection from a list of paths. + pub fn from_pathlist(sigpath: &Path) -> Result<(Self, usize)> { + debug!("multi from pathlist!"); + let file = File::open(sigpath) + .with_context(|| format!("Failed to open pathlist file: '{}'", sigpath))?; + let reader = BufReader::new(file); + + // load set of paths + let lines: HashSet<_> = reader + .lines() + .filter_map(|line| match line { + Ok(path) => Some(path), + Err(_err) => None, + }) + .collect(); + + let val = MultiCollection::load_set_of_json_files(&lines); + + let (multi, n_failed) = match val { + Ok(collection) => { + eprintln!("SUCCEEDED in loading as JSON files, woot woot"); + // CTB note: if any path fails to load, + // load_set_of_json_files returns Err. + (collection, 0) + } + Err(_) => { + eprintln!("FAILED to load as JSON files; falling back to general recursive"); + MultiCollection::load_set_of_paths(&lines) + } + }; + + Ok((multi, n_failed)) + } + + // Load from a sig file + pub fn from_signature(sigpath: &Path) -> Result { + debug!("multi from signature!"); + let signatures = Signature::from_path(sigpath) + .with_context(|| format!("Failed to load signatures from: '{}'", sigpath))?; + + let coll = Collection::from_sigs(signatures).with_context(|| { + format!( + "Loaded signatures but failed to load as collection: '{}'", + sigpath + ) + })?; + Ok(MultiCollection::new(vec![coll], false)) + } + + pub fn len(&self) -> usize { + let val: usize = self.collections.iter().map(|c| c.len()).sum(); + val + } + + pub fn is_empty(&self) -> bool { + let val: usize = self.collections.iter().map(|c| c.len()).sum(); + val == 0 + } + + // iterate over tuples + pub fn item_iter(&self) -> impl Iterator { + let s: Vec<_> = self + .collections + .iter() + .flat_map(|c| c.iter().map(move |(_idx, record)| (c, _idx, record))) + .collect(); + s.into_iter() + } + + pub fn par_iter(&self) -> impl IndexedParallelIterator { + // first create a Vec of all triples (Collection, Idx, Record) + let s: Vec<_> = self + .collections + .iter() // CTB: are we loading things into memory here? No... + .flat_map(|c| c.iter().map(move |(_idx, record)| (c, _idx, record))) + .collect(); + // then return a parallel iterator over the Vec. + s.into_par_iter() + } + + pub fn get_first_sig(&self) -> Option { + if !self.is_empty() { + let query_item = self.item_iter().next()?; + let (coll, _, _) = query_item; + Some(coll.sig_for_dataset(0).ok()?) + } else { + None + } + } + + // Load all sketches into memory, using SmallSignature to track original + // signature metadata. + pub fn load_sketches(self, selection: &Selection) -> Result> { + if self.contains_revindex { + eprintln!("WARNING: loading all sketches from a RocksDB into memory!"); + } + let sketchinfo: Vec<_> = self + .par_iter() + .filter_map(|(coll, _idx, record)| match coll.sig_from_record(record) { + Ok(sig) => { + trace!( + "MultiCollection load sketch: from:{} idx:{} loc:{}", + coll.storage().spec(), + _idx, + record.internal_location() + ); + let selected_sig = sig.clone().select(selection).ok()?; + let minhash = selected_sig.minhash()?.clone(); + + Some(SmallSignature { + location: record.internal_location().to_string(), + name: sig.name(), + md5sum: sig.md5sum(), + minhash, + }) + } + Err(_) => { + eprintln!( + "FAILED to load sketch from '{}'", + record.internal_location() + ); + None + } + }) + .collect(); + + Ok(sketchinfo) + } + + fn intersect_manifest(&mut self, manifest: &Manifest) { + for coll in self.collections.iter_mut() { + coll.intersect_manifest(manifest); + } + } + + // Load all sketches into memory, producing an in-memory Collection. + pub fn load_all_sigs(self, selection: &Selection) -> Result { + let all_sigs: Vec = self + .par_iter() + .filter_map(|(coll, _idx, record)| match coll.sig_from_record(record) { + Ok(sig) => { + let sig = sig.clone().select(selection).ok()?; + Some(Signature::from(sig)) + } + Err(_) => { + eprintln!( + "FAILED to load sketch from '{}'", + record.internal_location() + ); + None + } + }) + .collect(); + Ok(Collection::from_sigs(all_sigs)?) + } +} + +impl Select for MultiCollection { + fn select(self, selection: &Selection) -> Result { + let collections = self + .collections + .into_iter() + .filter_map(|c| c.select(selection).ok()) + .collect(); + + Ok(MultiCollection::new(collections, self.contains_revindex)) + } +} + +// Convert a single Collection into a MultiCollection +impl From for MultiCollection { + fn from(coll: Collection) -> Self { + // CTB: how can we check if revindex? + MultiCollection::new(vec![coll], false) + } +} + +// Merge a bunch of MultiCollection structs into one +impl From> for MultiCollection { + fn from(multi: Vec) -> Self { + let mut x: Vec = vec![]; + let mut contains_revindex = false; + for mc in multi.into_iter() { + for coll in mc.collections.into_iter() { + x.push(coll); + } + contains_revindex = contains_revindex || mc.contains_revindex; + } + MultiCollection::new(x, contains_revindex) + } +} + +// Extract a single Collection from a MultiCollection, if possible +impl TryFrom for Collection { + type Error = &'static str; + + fn try_from(multi: MultiCollection) -> Result { + if multi.collections.len() == 1 { + // this must succeed b/c len > 0 + Ok(multi.collections.into_iter().next().unwrap()) + } else { + Err("More than one Collection in this MultiCollection; cannot convert") + } + } +} + +/// Track a name/minhash. +pub struct SmallSignature { + pub location: String, + pub name: String, + pub md5sum: String, + pub minhash: KmerMinHash, +}