From 38d99b6952bf024a60664553d6e5ea18f7891dc1 Mon Sep 17 00:00:00 2001 From: Tessa Pierce Ward Date: Thu, 26 Dec 2024 17:57:43 -0800 Subject: [PATCH] MRG: modify n simultaneous downloads; update buildutils (#154) This PR integrates the changes to `BuildUtils` (`MultiSelection` details, minor changes to `BuildCollection` filtering + writing) that arose from integration into branchwater. It also makes the number of simultaneous downloads tunable, since I was having trouble when using the 3 default permits with large eukaryotic genomes. It also handles changes associated with zipfile handling from https://github.com/sourmash-bio/sourmash/pull/3431 arising from updating sourmash core to 0.18.0 ref #134 --- Cargo.lock | 43 +- Cargo.toml | 4 +- README.md | 19 +- src/directsketch.rs | 290 ++++++------- src/lib.rs | 8 +- .../sourmash_plugin_directsketch/__init__.py | 8 +- src/utils/buildutils.rs | 386 +++++++----------- src/utils/mod.rs | 3 +- tests/test_gbsketch.py | 15 +- tests/test_urlsketch.py | 2 +- 10 files changed, 330 insertions(+), 448 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f24d17..58aef4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -992,13 +992,12 @@ checksum = "a2983372caf4480544083767bf2d27defafe32af49ab4df3a0b7fc90793a3664" [[package]] name = "nalgebra" -version = "0.32.6" +version = "0.33.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5c17de023a86f59ed79891b2e5d5a94c705dbe904a5b5c9c952ea6221b03e4" +checksum = "26aecdf64b707efd1310e3544d709c5c0ac61c13756046aaaba41be5c4f66a3b" dependencies = [ "approx", "matrixmultiply", - "nalgebra-macros", "num-complex", "num-rational", "num-traits", @@ -1008,17 +1007,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "nalgebra-macros" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "254a5372af8fc138e36684761d3c0cdb758a4410e938babcff1c860ce14ddbfc" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "native-tls" version = "0.2.12" @@ -1082,6 +1070,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-complex" version = "0.4.6" @@ -1117,6 +1115,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" dependencies = [ + "num-bigint", "num-integer", "num-traits", ] @@ -1627,9 +1626,9 @@ dependencies = [ [[package]] name = "roaring" -version = "0.10.6" +version = "0.10.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f4b84ba6e838ceb47b41de5194a60244fac43d9fe03b71dbe8c5a201081d6d1" +checksum = "41589aba99537475bf697f2118357cad1c31590c5a1b9f6d9fc4ad6d07503661" dependencies = [ "bytemuck", "byteorder", @@ -1814,9 +1813,9 @@ dependencies = [ [[package]] name = "simba" -version = "0.8.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061507c94fc6ab4ba1c9a0305018408e312e17c041eb63bef8aa726fa33aceae" +checksum = "b3a386a501cd104797982c15ae17aafe8b9261315b5d07e3ec803f2ea26be0fa" dependencies = [ "approx", "num-complex", @@ -1864,9 +1863,9 @@ checksum = "bceb57dc07c92cdae60f5b27b3fa92ecaaa42fe36c55e22dbfb0b44893e0b1f7" [[package]] name = "sourmash" -version = "0.17.2" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54e30f752d984b1d8456024973f8d89772b4ba248f592b77b57d59ad27a232a0" +checksum = "fec589a91cf0d7d8cde46a51ccf165f32b9b4d709688f69b3fcea14c6f12e6e6" dependencies = [ "az", "byteorder", @@ -1950,9 +1949,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "statrs" -version = "0.17.1" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f697a07e4606a0a25c044de247e583a330dbb1731d11bc7350b81f48ad567255" +checksum = "2a3fe7c28c6512e766b0874335db33c94ad7b8f9054228ae1c2abd47ce7d335e" dependencies = [ "approx", "nalgebra", diff --git a/Cargo.toml b/Cargo.toml index 05ad101..c3c9c92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,10 @@ name = "sourmash_plugin_directsketch" crate-type = ["cdylib"] [dependencies] -pyo3 = { version = "0.23.3", features = ["extension-module", "anyhow"] } +pyo3 = { version = "0.23.3", features = ["extension-module","anyhow"]} rayon = "1.10.0" serde = { version = "1.0.204", features = ["derive"] } -sourmash = { version = "0.17.2"} +sourmash = { version = "0.18.0"} serde_json = "1.0.134" niffler = "2.4.0" needletail = "0.5.1" diff --git a/README.md b/README.md index 7b24325..566c532 100644 --- a/README.md +++ b/README.md @@ -47,8 +47,15 @@ pip install sourmash_plugin_directsketch ## Usage Considerations -If you're building large databases (over 20k files), we highly recommend you use batched zipfiles (v0.4+) to facilitate restart. If you encounter unexpected failures and are using a single zipfile output (default), `gbsketch`/`urlsketch` will have to re-download and re-sketch all files. If you instead set a batch size using `--batch-size`, e.g. 10000, then `gbsketch`/`urlsketch` can load any batched zips that finished writing, and avoid re-generating those signatures. For `gbsketch`, the batch size represents the number of accessions included in each zip, with all signatures associated with an accession grouped within a single `zip`. For `urlsketch`, the batch size represents the number of total signatures included in each zip. Note that batches will use the `--output` file to build batched filenames, so if you provided `output.zip`, your batches will be `output.1.zip`, `output.2.zip`, etc. +### Allowing restart with batching +If you're building large databases, we highly recommend you use batched zipfiles (v0.4+) to facilitate restart. If you encounter unexpected failures and are using a single zipfile output (default), `gbsketch`/`urlsketch` will have to re-download and re-sketch all files. If you instead set a batch size using `--batch-size`, then `gbsketch`/`urlsketch` can load any batched zips that finished writing, and avoid re-generating those signatures. For `gbsketch`, the batch size represents the number of accessions included in each zip, with all signatures associated with an accession grouped within a single `zip`. For `urlsketch`, the batch size represents the number of sigs associated with each url provided. Note that batches will use the `--output` file to build batched filenames, so if you provided `output.zip`, your batches will be `output.1.zip`, `output.2.zip`, etc. For small genomes (e.g. microbes), you can keep batch sizes quite large, e.g. 1000s-10000s. For large eukaryotic genomes where download takes much longer, you may want to use smaller batch sizes. + +To build a single database after batched sketching, you can use `sig cat` to build a single zipfile (`sourmash sig cat *.zip -o OUTPUT.zip`) or `sig collect` to collect all the zips into a standalone manifest that can be used with sourmash and branchwater commands. + +### Memory Requirements + +Directsketch downloads the full file, optionally checking the `md5sum`, then performs the sketch. As a result, you will need enough memory to hold up to 3 genomes in memory at once. For microbial genomes, this is trivial. For large eukaryotic genomes (e.g. plants!), be sure to provide sufficient memory. You can tune the number of simultaneous downloads (and thus, the number of genomes that will be in memory simultaneously) with `--n-simultaneous-downloads`. ## Running the commands @@ -125,7 +132,8 @@ options: -f FASTAS, --fastas FASTAS Write fastas here --batch-size BATCH_SIZE - Write smaller zipfiles, each containing sigs associated with this number of accessions. This allows gbsketch to recover after unexpected failures, rather than needing to + Write smaller zipfiles, each containing sigs associated with this number of accessions. + This allows gbsketch to recover after unexpected failures, rather than needing to restart sketching from scratch. Default: write all sigs to single zipfile. -k, --keep-fasta write FASTA files in addition to sketching. Default: do not write FASTA files --download-only just download genomes; do not sketch @@ -138,6 +146,8 @@ options: number of cores to use (default is all available) -r RETRY_TIMES, --retry-times RETRY_TIMES number of times to retry failed downloads + -n {1,2,3}, --n-simultaneous-downloads {1,2,3} + number of accessions to download simultaneously (default=1) -g, --genomes-only just download and sketch genome (DNA) files -m, --proteomes-only just download and sketch proteome (protein) files ``` @@ -186,7 +196,8 @@ options: -o OUTPUT, --output OUTPUT output zip file for the signatures --batch-size BATCH_SIZE - Write smaller zipfiles, each containing sigs associated with this number of accessions. This allows urlsketch to recover after unexpected failures, rather than needing to + Write smaller zipfiles, each containing sigs associated with this number of urls. + This allows urlsketch to recover after unexpected failures, rather than needing to restart sketching from scratch. Default: write all sigs to single zipfile. -f FASTAS, --fastas FASTAS Write fastas here @@ -202,6 +213,8 @@ options: number of cores to use (default is all available) -r RETRY_TIMES, --retry-times RETRY_TIMES number of times to retry failed downloads + -n {1,2,3}, --n-simultaneous-downloads {1,2,3} + number of simultaneous downloads (default=3) ``` ## Code of Conduct diff --git a/src/directsketch.rs b/src/directsketch.rs index cbac395..0c89e01 100644 --- a/src/directsketch.rs +++ b/src/directsketch.rs @@ -22,9 +22,7 @@ use crate::utils::{ InputMolType, MultiCollection, }; -use crate::utils::buildutils::{ - BuildCollection, BuildManifest, MultiBuildCollection, MultiSelect, MultiSelection, -}; +use crate::utils::buildutils::{BuildCollection, BuildManifest, MultiSelect, MultiSelection}; use reqwest::Url; async fn find_genome_directory( @@ -237,17 +235,14 @@ async fn dl_sketch_assembly_accession( location: &PathBuf, retry: Option, keep_fastas: bool, - sigs: &mut BuildCollection, + mut sigs: BuildCollection, genomes_only: bool, proteomes_only: bool, download_only: bool, -) -> Result<( - MultiBuildCollection, - Vec, - Vec, -)> { +) -> Result<(BuildCollection, Vec, Vec)> { + // todo -- move default retry to main function let retry_count = retry.unwrap_or(3); // Default retry count - let mut built_sigs = MultiBuildCollection::new(); + let empty_coll = BuildCollection::new(); let mut download_failures = Vec::::new(); let mut checksum_failures = Vec::::new(); @@ -283,7 +278,7 @@ async fn dl_sketch_assembly_accession( download_failures.push(failed_download_protein); } - return Ok((built_sigs, download_failures, checksum_failures)); + return Ok((empty_coll, download_failures, checksum_failures)); } }; let md5sum_url = GenBankFileType::Checksum.url(&base_url, &full_name); @@ -322,7 +317,7 @@ async fn dl_sketch_assembly_accession( checksum_failures.push(failed_checksum_download); } // return early from function b/c we can't check any checksums - return Ok((built_sigs, download_failures, checksum_failures)); + return Ok((empty_coll, download_failures, checksum_failures)); } }; @@ -383,16 +378,8 @@ async fn dl_sketch_assembly_accession( }; } } - if !download_only { - // remove any template sigs that were not populated - sigs.filter_empty(); - // to do: can we use sigs directly rather than adding to a multibuildcollection, now? - if !sigs.is_empty() { - built_sigs.add_collection(sigs); - } - } - Ok((built_sigs, download_failures, checksum_failures)) + Ok((sigs, download_failures, checksum_failures)) } #[allow(clippy::too_many_arguments)] @@ -402,17 +389,13 @@ async fn dl_sketch_url( location: &PathBuf, retry: Option, _keep_fastas: bool, - sigs: &mut BuildCollection, + mut sigs: BuildCollection, _genomes_only: bool, _proteomes_only: bool, download_only: bool, -) -> Result<( - MultiBuildCollection, - Vec, - Vec, -)> { +) -> Result<(BuildCollection, Vec, Vec)> { let retry_count = retry.unwrap_or(3); // Default retry count - let mut built_sigs = MultiBuildCollection::new(); + let empty_coll = BuildCollection::new(); let mut download_failures = Vec::::new(); let mut checksum_failures = Vec::::new(); @@ -442,12 +425,6 @@ async fn dl_sketch_url( sigs.build_sigs_from_data(data, "protein", name.clone(), filename.clone())?; } }; - // remove any template sigs that were not populated - sigs.filter_empty(); - // to do: can we use sigs directly rather than adding to a collection, now? - if !sigs.is_empty() { - built_sigs.add_collection(sigs); - } } } Err(err) => { @@ -466,6 +443,7 @@ async fn dl_sketch_url( reason: error_message.clone(), }; checksum_failures.push(checksum_mismatch); + sigs = empty_coll; } else { let failed_download = FailedDownload { accession: accession.clone(), @@ -476,11 +454,12 @@ async fn dl_sketch_url( url: Some(url), }; download_failures.push(failed_download); + sigs = empty_coll; } } } - Ok((built_sigs, download_failures, checksum_failures)) + Ok((sigs, download_failures, checksum_failures)) } fn get_current_directory() -> Result { @@ -561,34 +540,24 @@ async fn load_existing_zip_batches(outpath: &PathBuf) -> Result<(MultiCollection if let Some(file_name) = entry_path.file_name() { // Check if the file matches the base zip file or any batched zip file (outpath.zip, outpath.1.zip, etc.) if let Some(captures) = zip_file_pattern.captures(file_name) { - // Wrap the `from_zipfile` call in `catch_unwind` to prevent panic propagation - let result = panic::catch_unwind(|| Collection::from_zipfile(&entry_path)); - match result { - Ok(Ok(collection)) => { - // Successfully loaded the collection, push to `collections` + Collection::from_zipfile(&entry_path) + .map(|collection| { collections.push(collection); + eprintln!("loaded existing collection from {}", &entry_path); - // Extract the batch number (if it exists) and update the highest_batch + // Extract batch number if it exists if let Some(batch_str) = captures.get(1) { if let Ok(batch_num) = batch_str.as_str().parse::() { highest_batch = max(highest_batch, batch_num); } } - } - Ok(Err(e)) => { - // Handle the case where `from_zipfile` returned an error + }) + .unwrap_or_else(|e| { eprintln!( - "Warning: Failed to load zip file '{}'. Error: {:?}", + "Warning: Failed to load zip file '{}'; skipping. Zipfile Error: {:?}", entry_path, e ); - continue; // Skip the file and continue - } - Err(_) => { - // The code inside `from_zipfile` panicked - eprintln!("Warning: Invalid zip file '{}'; skipping.", entry_path); - continue; // Skip the file and continue - } - } + }); } } } @@ -622,7 +591,7 @@ async fn create_or_get_zip_file( } pub fn zipwriter_handle( - mut recv_sigs: tokio::sync::mpsc::Receiver, + mut recv_sigs: tokio::sync::mpsc::Receiver, output_sigs: Option, batch_size: usize, // Tunable batch size mut batch_index: usize, // starting batch index @@ -632,13 +601,13 @@ pub fn zipwriter_handle( let mut md5sum_occurrences = HashMap::new(); let mut zip_manifest = BuildManifest::new(); let mut wrote_sigs = false; - let mut file_count = 0; // Count of files in the current batch + let mut acc_count = 0; // count the number of accessions (or urls, in urlsketch) let mut zip_writer = None; if let Some(outpath) = output_sigs { let outpath: PathBuf = outpath.into(); - while let Some(mut multibuildcoll) = recv_sigs.recv().await { + while let Some(mut buildcoll) = recv_sigs.recv().await { if zip_writer.is_none() { // create zip file if needed zip_writer = @@ -653,29 +622,28 @@ pub fn zipwriter_handle( if let Some(zip_writer) = zip_writer.as_mut() { // write all sigs from sigcoll. Note that this method updates each record's internal location - for sigcoll in &mut multibuildcoll.collections { - match sigcoll - .async_write_sigs_to_zip(zip_writer, &mut md5sum_occurrences) - .await - { - Ok(_) => { - file_count += sigcoll.size(); - wrote_sigs = true; - } - Err(e) => { - let error = e.context("Error processing signature"); - if error_sender.send(error).await.is_err() { - return; - } + match buildcoll + .async_write_sigs_to_zip(zip_writer, &mut md5sum_occurrences) + .await + { + Ok(_) => { + wrote_sigs = true; + } + Err(e) => { + let error = e.context("Error processing signature"); + if error_sender.send(error).await.is_err() { + return; } } - // Add all records from sigcoll manifest - zip_manifest.extend_from_manifest(&sigcoll.manifest); } + // Add all records from buildcoll manifest + zip_manifest.extend_from_manifest(&buildcoll.manifest); + // each buildcoll has accession + acc_count += 1; } // if batch size is non-zero and is reached, close the current zip - if batch_size > 0 && file_count >= batch_size { + if batch_size > 0 && acc_count >= batch_size { eprintln!("writing batch {}", batch_index); if let Some(mut zip_writer) = zip_writer.take() { if let Err(e) = zip_manifest @@ -692,13 +660,13 @@ pub fn zipwriter_handle( } // Start a new batch batch_index += 1; - file_count = 0; + acc_count = 0; zip_manifest.clear(); zip_writer = None; // reset zip_writer so a new zip will be created when needed } } - if file_count > 0 { + if acc_count > 0 { // write the final manifest if let Some(mut zip_writer) = zip_writer.take() { if let Err(e) = zip_manifest @@ -884,12 +852,14 @@ pub async fn gbsketch( proteomes_only: bool, download_only: bool, batch_size: u32, + n_permits: usize, output_sigs: Option, ) -> Result<(), anyhow::Error> { let batch_size = batch_size as usize; let mut batch_index = 1; let mut existing_records_map: HashMap = HashMap::new(); let mut filter = false; + // if writing sigs, prepare output and look for existing sig batches if let Some(ref output_sigs) = output_sigs { // Create outpath from output_sigs let outpath = PathBuf::from(output_sigs); @@ -922,7 +892,7 @@ pub async fn gbsketch( create_dir_all(&download_path)?; } // create channels. buffer size here is 4 b/c we can do 3 downloads simultaneously - let (send_sigs, recv_sigs) = tokio::sync::mpsc::channel::(4); + let (send_sigs, recv_sigs) = tokio::sync::mpsc::channel::(4); let (send_failed, recv_failed) = tokio::sync::mpsc::channel::(4); let (send_failed_checksums, recv_failed_checksum) = tokio::sync::mpsc::channel::(4); @@ -954,7 +924,7 @@ pub async fn gbsketch( handles.push(checksum_failures_handle); // Worker tasks - let semaphore = Arc::new(Semaphore::new(3)); // Limiting concurrent downloads + let semaphore = Arc::new(Semaphore::new(n_permits)); // Limiting concurrent downloads let client = Arc::new(Client::new()); // Open the file containing the accessions synchronously @@ -963,52 +933,49 @@ pub async fn gbsketch( bail!("No accessions to download and sketch.") } - let sig_template_result = BuildCollection::from_param_str(param_str.as_str()); - let mut sig_templates = match sig_template_result { - Ok(sig_templates) => sig_templates, - Err(e) => { - bail!("Failed to parse params string: {}", e); - } - }; - + let mut sig_templates = BuildCollection::new(); let mut genomes_only = genomes_only; let mut proteomes_only = proteomes_only; - // Check if we have dna signature templates and not keep_fastas - if sig_templates.dna_size()? == 0 && !keep_fastas { - eprintln!("No DNA signature templates provided, and --keep-fasta is not set."); - proteomes_only = true; - } - // Check if we have protein signature templates not keep_fastas - if sig_templates.anyprotein_size()? == 0 && !keep_fastas { - eprintln!("No protein signature templates provided, and --keep-fasta is not set."); - genomes_only = true; - } - if genomes_only { - // select only DNA templates - let multiselection = MultiSelection::from_moltypes(vec!["dna"])?; - sig_templates = sig_templates.select(&multiselection)?; - - if !download_only { - eprintln!("Downloading and sketching genomes only."); - } else { + if download_only { + if genomes_only { eprintln!("Downloading genomes only."); + } else if proteomes_only { + eprintln!("Downloading proteomes only."); } - } else if proteomes_only { - // select only protein templates - let multiselection = MultiSelection::from_moltypes(vec!["protein", "dayhoff", "hp"])?; - sig_templates = sig_templates.select(&multiselection)?; - if !download_only { + } else { + let sig_template_result = BuildCollection::from_param_str(param_str.as_str()); + sig_templates = match sig_template_result { + Ok(sig_templates) => sig_templates, + Err(e) => { + bail!("Failed to parse params string: {}", e); + } + }; + // Check if we have dna signature templates and not keep_fastas + if sig_templates.dna_size()? == 0 && !keep_fastas { + eprintln!("No DNA signature templates provided, and --keep-fasta is not set."); + proteomes_only = true; + } + // Check if we have protein signature templates not keep_fastas + if sig_templates.anyprotein_size()? == 0 && !keep_fastas { + eprintln!("No protein signature templates provided, and --keep-fasta is not set."); + genomes_only = true; + } + if genomes_only { + // select only templates built from DNA input + let multiselection = MultiSelection::from_input_moltype("DNA")?; + sig_templates.select(&multiselection)?; + eprintln!("Downloading and sketching genomes only."); + } else if proteomes_only { + // select only templates built from protein input + let multiselection = MultiSelection::from_input_moltype("protein")?; + sig_templates.select(&multiselection)?; eprintln!("Downloading and sketching proteomes only."); - } else { - eprintln!("Downloading proteomes only."); + } + if sig_templates.is_empty() && !download_only { + bail!("No signatures to build.") } } - - if sig_templates.is_empty() && !download_only { - bail!("No signatures to build.") - } - // report every 1 percent (or every 1, whichever is larger) let reporting_threshold = std::cmp::max(n_accs / 100, 1); @@ -1053,7 +1020,7 @@ pub async fn gbsketch( &download_path_clone, Some(retry_times), keep_fastas, - &mut sigs, + sigs, genomes_only, proteomes_only, download_only, @@ -1119,6 +1086,7 @@ pub async fn urlsketch( keep_fastas: bool, download_only: bool, batch_size: u32, + n_permits: usize, output_sigs: Option, failed_checksums_csv: Option, ) -> Result<(), anyhow::Error> { @@ -1159,7 +1127,7 @@ pub async fn urlsketch( } // create channels. buffer size here is 4 b/c we can do 3 downloads simultaneously - let (send_sigs, recv_sigs) = tokio::sync::mpsc::channel::(4); + let (send_sigs, recv_sigs) = tokio::sync::mpsc::channel::(4); let (send_failed, recv_failed) = tokio::sync::mpsc::channel::(4); let (send_failed_checksums, recv_failed_checksum) = tokio::sync::mpsc::channel::(4); @@ -1197,7 +1165,7 @@ pub async fn urlsketch( handles.push(error_handle); // Worker tasks - let semaphore = Arc::new(Semaphore::new(3)); // Limiting concurrent downloads + let semaphore = Arc::new(Semaphore::new(n_permits)); // Limiting concurrent downloads let client = Arc::new(Client::new()); // Open the file containing the accessions synchronously @@ -1206,50 +1174,49 @@ pub async fn urlsketch( bail!("No accessions to download and sketch.") } - let sig_template_result = BuildCollection::from_param_str(param_str.as_str()); - let mut sig_templates = match sig_template_result { - Ok(sig_templates) => sig_templates, - Err(e) => { - bail!("Failed to parse params string: {}", e); - } - }; - + // todo: add genomes_only / proteomes_only to the input options + let mut sig_templates = BuildCollection::new(); let mut genomes_only = false; let mut proteomes_only = false; + let dna_multiselection = MultiSelection::from_input_moltype("dna")?; + let protein_multiselection = MultiSelection::from_input_moltype("protein")?; - // Check if we have dna signature templates and not keep_fastas - if sig_templates.dna_size()? == 0 && !keep_fastas { - eprintln!("No DNA signature templates provided, and --keep-fasta is not set."); - proteomes_only = true; - } - // Check if we have protein signature templates not keep_fastas - if sig_templates.anyprotein_size()? == 0 && !keep_fastas { - eprintln!("No protein signature templates provided, and --keep-fasta is not set."); - genomes_only = true; - } - if genomes_only { - // select only DNA templates - let multiselection = MultiSelection::from_moltypes(vec!["dna"])?; - sig_templates = sig_templates.select(&multiselection)?; - - if !download_only { - eprintln!("Downloading and sketching genomes only."); - } else { + if download_only { + if genomes_only { eprintln!("Downloading genomes only."); + } else if proteomes_only { + eprintln!("Downloading proteomes only."); } - } else if proteomes_only { - // select only protein templates - let multiselection = MultiSelection::from_moltypes(vec!["protein", "dayhoff", "hp"])?; - sig_templates = sig_templates.select(&multiselection)?; - if !download_only { + } else { + let sig_template_result = BuildCollection::from_param_str(param_str.as_str()); + sig_templates = match sig_template_result { + Ok(sig_templates) => sig_templates, + Err(e) => { + bail!("Failed to parse params string: {}", e); + } + }; + // Check if we have dna signature templates and not keep_fastas + if sig_templates.dna_size()? == 0 && !keep_fastas { + eprintln!("No DNA signature templates provided, and --keep-fasta is not set."); + proteomes_only = true; + } + // Check if we have protein signature templates not keep_fastas + if sig_templates.anyprotein_size()? == 0 && !keep_fastas { + eprintln!("No protein signature templates provided, and --keep-fasta is not set."); + genomes_only = true; + } + if genomes_only { + // select only DNA templates + sig_templates.select(&dna_multiselection)?; + eprintln!("Downloading and sketching genomes only."); + } else if proteomes_only { + // select only protein templates + sig_templates.select(&protein_multiselection)?; eprintln!("Downloading and sketching proteomes only."); - } else { - eprintln!("Downloading proteomes only."); } - } - - if sig_templates.is_empty() && !download_only { - bail!("No signatures to build.") + if sig_templates.is_empty() && !download_only { + bail!("No signatures to build.") + } } // report every 1 percent (or every 1, whichever is larger) @@ -1267,6 +1234,17 @@ pub async fn urlsketch( } } + // eliminate sigs that won't be added to based on moltype + // this assumes no translation --> modify as needed if adding that. + if accinfo.moltype == InputMolType::Dna { + sigs.select(&dna_multiselection)?; + } else { + sigs.select(&protein_multiselection)?; + } + if sigs.is_empty() && !download_only { + continue; + } + let semaphore_clone = Arc::clone(&semaphore); let client_clone = Arc::clone(&client); let send_sigs = send_sigs.clone(); @@ -1294,7 +1272,7 @@ pub async fn urlsketch( &download_path_clone, Some(retry_times), keep_fastas, - &mut sigs, + sigs, genomes_only, proteomes_only, download_only, diff --git a/src/lib.rs b/src/lib.rs index 648ae69..6c8d251 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,7 +49,7 @@ fn set_tokio_thread_pool(num_threads: usize) -> PyResult { #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (input_csv, param_str, failed_csv, failed_checksums, retry_times, fasta_location, keep_fastas, genomes_only, proteomes_only, download_only, batch_size, output_sigs=None))] +#[pyo3(signature = (input_csv, param_str, failed_csv, failed_checksums, retry_times, fasta_location, keep_fastas, genomes_only, proteomes_only, download_only, batch_size, n_permits, output_sigs=None))] fn do_gbsketch( py: Python, input_csv: String, @@ -63,6 +63,7 @@ fn do_gbsketch( proteomes_only: bool, download_only: bool, batch_size: u32, + n_permits: usize, output_sigs: Option, ) -> anyhow::Result { match directsketch::gbsketch( @@ -78,6 +79,7 @@ fn do_gbsketch( proteomes_only, download_only, batch_size, + n_permits, output_sigs, ) { Ok(_) => Ok(0), @@ -90,7 +92,7 @@ fn do_gbsketch( #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (input_csv, param_str, failed_csv, retry_times, fasta_location, keep_fastas, download_only, batch_size, output_sigs=None, failed_checksums=None))] +#[pyo3(signature = (input_csv, param_str, failed_csv, retry_times, fasta_location, keep_fastas, download_only, batch_size, n_permits, output_sigs=None, failed_checksums=None))] fn do_urlsketch( py: Python, input_csv: String, @@ -101,6 +103,7 @@ fn do_urlsketch( keep_fastas: bool, download_only: bool, batch_size: u32, + n_permits: usize, output_sigs: Option, failed_checksums: Option, ) -> anyhow::Result { @@ -114,6 +117,7 @@ fn do_urlsketch( keep_fastas, download_only, batch_size, + n_permits, output_sigs, failed_checksums, ) { diff --git a/src/python/sourmash_plugin_directsketch/__init__.py b/src/python/sourmash_plugin_directsketch/__init__.py index da88548..d47d7ac 100644 --- a/src/python/sourmash_plugin_directsketch/__init__.py +++ b/src/python/sourmash_plugin_directsketch/__init__.py @@ -65,6 +65,8 @@ def __init__(self, p): help='number of cores to use (default is all available)') p.add_argument('-r', '--retry-times', default=1, type=int, help='number of times to retry failed downloads') + p.add_argument('-n', '--n-simultaneous-downloads', default=1, type=int, choices = [1, 2, 3], + help='number of accessions to download simultaneously (default=1)') group = p.add_mutually_exclusive_group() group.add_argument('-g', '--genomes-only', action='store_true', help='just download and sketch genome (DNA) files') group.add_argument('-m', '--proteomes-only', action='store_true', help='just download and sketch proteome (protein) files') @@ -104,6 +106,7 @@ def main(self, args): args.proteomes_only, args.download_only, args.batch_size, + args.n_simultaneous_downloads, args.output) if status == 0: @@ -126,7 +129,7 @@ def __init__(self, p): p.add_argument('-o', '--output', default=None, help='output zip file for the signatures') p.add_argument('--batch-size', type=non_negative_int, default = 0, - help='Write smaller zipfiles, each containing sigs associated with this number of accessions. \ + help='Write smaller zipfiles, each containing sigs associated with this number of urls. \ This allows urlsketch to recover after unexpected failures, rather than needing to \ restart sketching from scratch. Default: write all sigs to single zipfile.') p.add_argument('-f', '--fastas', @@ -143,6 +146,8 @@ def __init__(self, p): help='number of cores to use (default is all available)') p.add_argument('-r', '--retry-times', default=1, type=int, help='number of times to retry failed downloads') + p.add_argument('-n', '--n-simultaneous-downloads', default=3, type=int, choices = [1, 2, 3], + help='number of simultaneous downloads (default=3)') def main(self, args): @@ -176,6 +181,7 @@ def main(self, args): args.keep_fasta, args.download_only, args.batch_size, + args.n_simultaneous_downloads, args.output, args.checksum_fail) diff --git a/src/utils/buildutils.rs b/src/utils/buildutils.rs index 4120d3d..b4d1d2c 100644 --- a/src/utils/buildutils.rs +++ b/src/utils/buildutils.rs @@ -32,14 +32,29 @@ pub struct MultiSelection { } impl MultiSelection { - /// Create a `MultiSelection` from a single `Selection` - pub fn new(selection: Selection) -> Self { - MultiSelection { - selections: vec![selection], - } + pub fn from_moltypes(moltypes: Vec<&str>) -> Result { + let selections: Result, SourmashError> = moltypes + .into_iter() + .map(|moltype_str| { + let moltype = HashFunctions::try_from(moltype_str)?; + let mut new_selection = Selection::default(); // Create a default Selection + new_selection.set_moltype(moltype); // Set the moltype + Ok(new_selection) + }) + .collect(); + + Ok(MultiSelection { + selections: selections?, + }) } - pub fn from_moltypes(moltypes: Vec<&str>) -> Result { + pub fn from_input_moltype(input_moltype: &str) -> Result { + // currently we don't allow translation. Will need to change this when we do. + // is there a better way to do this? + let mut moltypes = vec!["DNA"]; + if input_moltype == "protein" { + moltypes = vec!["protein", "dayhoff", "hp"]; + } let selections: Result, SourmashError> = moltypes .into_iter() .map(|moltype_str| { @@ -57,9 +72,7 @@ impl MultiSelection { } pub trait MultiSelect { - fn select(self, multi_selection: &MultiSelection) -> Result - where - Self: Sized; + fn select(&mut self, multi_selection: &MultiSelection) -> Result<(), SourmashError>; } #[derive(Debug, Clone, Getters, Setters, Serialize)] @@ -122,10 +135,10 @@ where } } -impl Default for BuildRecord { - fn default() -> Self { - // Default BuildRecord is DNA default - BuildRecord { +impl BuildRecord { + // no general default, but we have defaults for each moltype + pub fn default_dna() -> Self { + Self { internal_location: None, md5: None, md5short: None, @@ -142,21 +155,13 @@ impl Default for BuildRecord { sequence_added: false, } } -} - -impl BuildRecord { - pub fn default_dna() -> Self { - Self { - ..Default::default() - } - } pub fn default_protein() -> Self { Self { moltype: "protein".to_string(), ksize: 10, scaled: 200, - ..Default::default() + ..Self::default_dna() } } @@ -165,7 +170,7 @@ impl BuildRecord { moltype: "dayhoff".to_string(), ksize: 10, scaled: 200, - ..Default::default() + ..Self::default_dna() } } @@ -174,7 +179,7 @@ impl BuildRecord { moltype: "hp".to_string(), ksize: 10, scaled: 200, - ..Default::default() + ..Self::default_dna() } } @@ -189,7 +194,7 @@ impl BuildRecord { num: *record.num(), scaled: *record.scaled(), with_abundance: record.with_abundance(), - ..Default::default() // ignore remaining fields + ..Self::default_dna() // ignore remaining fields } } @@ -219,6 +224,16 @@ impl BuildRecord { valid } + + pub fn params(&self) -> (u32, String, bool, u32, u32) { + ( + self.ksize, + self.moltype.clone(), + self.with_abundance, + self.num, + self.scaled, + ) + } } impl PartialEq for BuildRecord { @@ -272,6 +287,10 @@ impl BuildManifest { self.records.clear(); } + pub fn summarize_params(&self) -> HashSet<(u32, String, bool, u32, u32)> { + self.iter().map(|record| record.params()).collect() + } + pub fn filter_manifest(&self, other: &BuildManifest) -> Self { // Create a HashSet of references to the `BuildRecord`s in `other` let pairs: HashSet<_> = other.records.iter().collect(); @@ -307,7 +326,10 @@ impl BuildManifest { let mut csv_writer = csv::Writer::from_writer(wtr); for record in &self.records { - csv_writer.serialize(record)?; // Serialize each BuildRecord + // don't write empty records (empty template sigs aren't written from BuildCollection) + if record.sequence_added { + csv_writer.serialize(record)?; // Serialize each BuildRecord + } } csv_writer.flush()?; // Ensure all data is written @@ -344,18 +366,15 @@ impl BuildManifest { } impl MultiSelect for BuildManifest { - fn select(self, multi_selection: &MultiSelection) -> Result { - let rows = self.records.iter().filter(|row| { - // for each row, check if it matches any of the Selection structs in MultiSelection + fn select(&mut self, multi_selection: &MultiSelection) -> Result<(), SourmashError> { + // Retain only the records that match any selection + self.records.retain(|record| { multi_selection .selections .iter() - .any(|selection| row.matches_selection(selection)) + .any(|selection| record.matches_selection(selection)) }); - - Ok(BuildManifest { - records: rows.cloned().collect(), - }) + Ok(()) } } @@ -415,25 +434,24 @@ impl BuildCollection { pub fn dna_size(&self) -> Result { let multiselection = MultiSelection::from_moltypes(vec!["dna"])?; - let selected_manifest = self.manifest.clone().select(&multiselection)?; - - Ok(selected_manifest.records.len()) + let mut mf = self.manifest.clone(); // temporary mutable copy + mf.select(&multiselection)?; + Ok(mf.records.len()) } pub fn protein_size(&self) -> Result { let multiselection = MultiSelection::from_moltypes(vec!["protein"])?; - let selected_manifest = self.manifest.clone().select(&multiselection)?; - - Ok(selected_manifest.records.len()) + let mut mf = self.manifest.clone(); // temporary mutable copy + mf.select(&multiselection)?; + Ok(mf.records.len()) } pub fn anyprotein_size(&self) -> Result { let multiselection = MultiSelection::from_moltypes(vec!["protein", "dayhoff", "hp"])?; - let selected_manifest = self.manifest.clone().select(&multiselection)?; - - Ok(selected_manifest.records.len()) + let mut mf = self.manifest.clone(); // temporary mutable copy + mf.select(&multiselection)?; + Ok(mf.records.len()) } - pub fn parse_ksize(value: &str) -> Result { value .parse::() @@ -663,26 +681,6 @@ impl BuildCollection { }); } - // filter template signatures that had no sequence added - // suggested use right before writing signatures - pub fn filter_empty(&mut self) { - let mut sig_index = 0; - - self.manifest.records.retain(|record| { - // Keep only records where `sequence_added` is `true`. - let keep = record.sequence_added; - - if !keep { - // Remove the corresponding signature at the same index if the record is not kept. - self.sigs.remove(sig_index); - } else { - sig_index += 1; // Only increment if we keep the record and signature. - } - - keep - }); - } - pub fn filter(&mut self, params_set: &HashSet) { let mut index = 0; while index < self.manifest.records.len() { @@ -715,6 +713,36 @@ impl BuildCollection { self.manifest.records.iter_mut().zip(self.sigs.iter_mut()) } + fn build_sigs_from_record( + &mut self, + input_moltype: &str, + record: &SequenceRecord, + ) -> Result<()> { + // Optionally use `par_iter_mut` for parallel execution + self.iter_mut().try_for_each(|(rec, sig)| { + if input_moltype == "protein" + && (rec.moltype() == HashFunctions::Murmur64Protein + || rec.moltype() == HashFunctions::Murmur64Dayhoff + || rec.moltype() == HashFunctions::Murmur64Hp) + { + sig.add_protein(&record.seq()) + .context("Failed to add protein")?; + if !rec.sequence_added { + rec.sequence_added = true; + } + } else if (input_moltype == "DNA" || input_moltype == "dna") + && rec.moltype() == HashFunctions::Murmur64Dna + { + sig.add_sequence(&record.seq(), true) + .context("Failed to add sequence")?; + if !rec.sequence_added { + rec.sequence_added = true; + } + } + Ok(()) + }) + } + pub fn build_sigs_from_data( &mut self, data: Vec, @@ -729,24 +757,7 @@ impl BuildCollection { // Iterate over FASTA records and add sequences/proteins to sigs while let Some(record) = fastx_reader.next() { let record = record.context("Failed to read record")?; - self.iter_mut().for_each(|(rec, sig)| { - if input_moltype == "protein" - && (rec.moltype == "protein" || rec.moltype == "dayhoff" || rec.moltype == "hp") - { - sig.add_protein(&record.seq()) - .expect("Failed to add protein"); - if !rec.sequence_added { - rec.sequence_added = true - } - } else if input_moltype == "DNA" && rec.moltype == "DNA" { - sig.add_sequence(&record.seq(), true) - .expect("Failed to add sequence"); - // if not force, panics with 'N' in dna sequence - if !rec.sequence_added { - rec.sequence_added = true - } - } - }); + self.build_sigs_from_record(input_moltype, &record)?; } // After processing sequences, update sig, record information @@ -755,42 +766,37 @@ impl BuildCollection { Ok(()) } - pub fn build_sigs_from_file( + pub fn build_sigs_from_file_or_stdin( &mut self, - input_moltype: &str, // (protein/dna); todo - use hashfns? + input_moltype: &str, // "protein" or "DNA" name: String, filename: String, - ) -> Result<()> { - let mut fastx_reader = parse_fastx_file(&filename)?; - // Iterate over FASTA records and add sequences/proteins to sigs - while let Some(record) = fastx_reader.next() { - let record = record.context("Failed to read record")?; - self.iter_mut().for_each(|(rec, sig)| { - if input_moltype == "protein" - && (rec.moltype() == HashFunctions::Murmur64Protein - || rec.moltype() == HashFunctions::Murmur64Dayhoff - || rec.moltype() == HashFunctions::Murmur64Hp) - { - sig.add_protein(&record.seq()) - .expect("Failed to add protein"); - if !rec.sequence_added { - rec.sequence_added = true - } - } else { - sig.add_sequence(&record.seq(), true) - .expect("Failed to add sequence"); - // if not force, panics with 'N' in dna sequence - if !rec.sequence_added { - rec.sequence_added = true - } - } - }); + ) -> Result { + // Create a FASTX reader from the file or stdin + let mut fastx_reader = if filename == "-" { + let stdin = std::io::stdin(); + parse_fastx_reader(stdin).context("Failed to parse FASTA/FASTQ data from stdin")? + } else { + parse_fastx_file(&filename).context("Failed to open file for FASTA/FASTQ data")? + }; + + // Counter for the number of records processed + let mut record_count: u64 = 0; + + // Parse records and add sequences to signatures + while let Some(record_result) = fastx_reader.next() { + let record = record_result.context("Failed to read a record from input")?; + + self.build_sigs_from_record(input_moltype, &record)?; + + record_count += 1; } - // After processing sequences, update sig, record information + // Update signature and record metadata self.update_info(name, filename); - Ok(()) + // Return the count of records parsed + Ok(record_count) } pub fn build_singleton_sigs( @@ -799,30 +805,11 @@ impl BuildCollection { input_moltype: &str, // (protein/dna); todo - use hashfns? filename: String, ) -> Result<()> { - self.iter_mut().for_each(|(rec, sig)| { - if input_moltype == "protein" - && (rec.moltype() == HashFunctions::Murmur64Protein - || rec.moltype() == HashFunctions::Murmur64Dayhoff - || rec.moltype() == HashFunctions::Murmur64Hp) - { - sig.add_protein(&record.seq()) - .expect("Failed to add protein"); - if !rec.sequence_added { - rec.sequence_added = true - } - } else { - sig.add_sequence(&record.seq(), true) - .expect("Failed to add sequence"); - // if not force, panics with 'N' in dna sequence - if !rec.sequence_added { - rec.sequence_added = true - } - } - }); + self.build_sigs_from_record(input_moltype, &record)?; + // After processing sequences, update sig, record information let record_name = std::str::from_utf8(record.id()) .expect("could not get record id") .to_string(); - // After processing sequences, update sig, record information self.update_info(record_name, filename); Ok(()) @@ -849,7 +836,6 @@ impl BuildCollection { } } - // to do -- use filter_empty to ensure we're not writing empty template sigs?? pub async fn async_write_sigs_to_zip( &mut self, // need mutable to update records zip_writer: &mut ZipFileWriter>, @@ -857,6 +843,10 @@ impl BuildCollection { ) -> Result<()> { // iterate over both records and signatures for (record, sig) in self.iter_mut() { + // skip any empty sig templates (no sequence added) + if !record.sequence_added { + continue; + } let md5sum_str = sig.md5sum(); let count = md5sum_occurrences.entry(md5sum_str.clone()).or_insert(0); *count += 1; @@ -917,57 +907,25 @@ impl<'a> IntoIterator for &'a mut BuildCollection { } impl MultiSelect for BuildCollection { - // to do --> think through the best/most efficient way to do this // in sourmash core, we don't need to select sigs themselves. Is this due to the way that Idx/Storage work? - fn select(mut self, multi_selection: &MultiSelection) -> Result { - // Collect indices while retaining matching records - let mut selected_indices = Vec::new(); - let mut current_index = 0; - + fn select(&mut self, multi_selection: &MultiSelection) -> Result<(), SourmashError> { + // Retain records and sigs in place + let mut i = 0; self.manifest.records.retain(|record| { let keep = multi_selection .selections .iter() .any(|selection| record.matches_selection(selection)); - if keep { - selected_indices.push(current_index); // Collect the index of the retained record + if !keep { + self.sigs.remove(i); // Remove corresponding signature + } else { + i += 1; } - - current_index += 1; // Move to the next index - keep // Retain the record if it matches the selection - }); - - // Retain corresponding signatures using the collected indices - let mut sig_index = 0; - self.sigs.retain(|_sig| { - let keep = selected_indices.contains(&sig_index); - sig_index += 1; keep }); - Ok(self) - } -} - -#[derive(Debug, Clone)] -pub struct MultiBuildCollection { - pub collections: Vec, -} - -impl MultiBuildCollection { - pub fn new() -> Self { - MultiBuildCollection { - collections: Vec::new(), - } - } - - pub fn is_empty(&self) -> bool { - self.collections.is_empty() - } - - pub fn add_collection(&mut self, collection: &mut BuildCollection) { - self.collections.push(collection.clone()) + Ok(()) } } @@ -1025,19 +983,19 @@ mod tests { ksize: 31, moltype: "DNA".to_string(), with_abundance: true, - ..Default::default() + ..BuildRecord::default_dna() }, BuildRecord { ksize: 21, moltype: "DNA".to_string(), with_abundance: true, - ..Default::default() + ..BuildRecord::default_dna() }, BuildRecord { ksize: 51, moltype: "DNA".to_string(), with_abundance: true, - ..Default::default() + ..BuildRecord::default_dna() }, BuildRecord::default_protein(), ]; @@ -1228,14 +1186,14 @@ mod tests { ksize: 21, moltype: "DNA".to_string(), scaled: 1000, - ..Default::default() + ..BuildRecord::default_dna() }; let rec3 = BuildRecord { ksize: 31, moltype: "DNA".to_string(), scaled: 1000, with_abundance: true, - ..Default::default() + ..BuildRecord::default_dna() }; let bmanifest = BuildManifest { @@ -1272,7 +1230,7 @@ mod tests { moltype: "DNA".to_string(), scaled: 1000, with_abundance: true, - ..Default::default() + ..BuildRecord::default_dna() }; // Add the DNA record to the collection with a matching moltype. @@ -1294,7 +1252,7 @@ mod tests { moltype: "protein".to_string(), scaled: 200, with_abundance: false, - ..Default::default() + ..BuildRecord::default_dna() }; // Add the protein record to the collection with a matching moltype. @@ -1316,17 +1274,9 @@ mod tests { moltype: "dayhoff".to_string(), scaled: 200, with_abundance: true, - ..Default::default() + ..BuildRecord::default_dna() }; - // Attempt to add the non-matching record with "DNA" as input moltype. - // this is because we currently don't allow translation - // build_collection.add_template_sig_from_record(&non_matching_record, "DNA"); - - // Verify that the non-matching record was not added. - // assert_eq!(build_collection.manifest.records.len(), 2); - // assert_eq!(build_collection.sigs.len(), 2); - // Add the same non-matching record with a matching input moltype. build_collection.add_template_sig_from_record(&non_matching_record); @@ -1339,62 +1289,4 @@ mod tests { assert_eq!(added_dayhoff_record.ksize, 10); assert_eq!(added_dayhoff_record.with_abundance, true); } - - #[test] - fn test_filter_empty() { - // Create a parameter string that generates BuildRecords with different `sequence_added` values. - let params_str = "k=31,abund,dna_k=21,protein_k=10,abund"; - - // Use `from_param_str` to build a `BuildCollection`. - let mut build_collection = BuildCollection::from_param_str(params_str) - .expect("Failed to build BuildCollection from params_str"); - - // Manually set `sequence_added` for each record to simulate different conditions. - build_collection.manifest.records[0].sequence_added = true; // Keep this record. - build_collection.manifest.records[1].sequence_added = false; // This record should be removed. - build_collection.manifest.records[2].sequence_added = true; // Keep this record. - - // Check initial sizes before filtering. - assert_eq!( - build_collection.manifest.records.len(), - 3, - "Expected 3 records before filtering, but found {}", - build_collection.manifest.records.len() - ); - assert_eq!( - build_collection.sigs.len(), - 3, - "Expected 3 signatures before filtering, but found {}", - build_collection.sigs.len() - ); - - // Apply the `filter_empty` method. - build_collection.filter_empty(); - - // After filtering, only the records with `sequence_added == true` should remain. - assert_eq!( - build_collection.manifest.records.len(), - 2, - "Expected 2 records after filtering, but found {}", - build_collection.manifest.records.len() - ); - - // Check that the signatures also match the remaining records. - assert_eq!( - build_collection.sigs.len(), - 2, - "Expected 2 signatures after filtering, but found {}", - build_collection.sigs.len() - ); - - // Verify that the remaining records have `sequence_added == true`. - assert!( - build_collection - .manifest - .records - .iter() - .all(|rec| rec.sequence_added), - "All remaining records should have `sequence_added == true`" - ); - } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 041f8be..6c7fa18 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -2,13 +2,12 @@ use anyhow::{anyhow, Result}; use reqwest::Url; use sourmash::collection::Collection; use std::collections::HashMap; -// use std::collections::HashSet; use std::fmt; pub mod buildutils; use crate::utils::buildutils::{BuildManifest, BuildRecord}; -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum InputMolType { Dna, Protein, diff --git a/tests/test_gbsketch.py b/tests/test_gbsketch.py index 9a189f5..f7ead2b 100644 --- a/tests/test_gbsketch.py +++ b/tests/test_gbsketch.py @@ -276,22 +276,13 @@ def test_gbsketch_save_fastas(runtmp): else: assert sig.md5sum() == ss3.md5sum() + def test_gbsketch_download_only(runtmp, capfd): acc_csv = get_test_data('acc.csv') - output = runtmp.output('simple.zip') failed = runtmp.output('failed.csv') out_dir = runtmp.output('out_fastas') ch_fail = runtmp.output('checksum_dl_failed.csv') - - sig1 = get_test_data('GCA_000175535.1.sig.gz') - sig2 = get_test_data('GCA_000961135.2.sig.gz') - sig3 = get_test_data('GCA_000961135.2.protein.sig.gz') - ss1 = sourmash.load_one_signature(sig1, ksize=31) - ss2 = sourmash.load_one_signature(sig2, ksize=31) - # why does this need ksize =30 and not ksize = 10!??? - ss3 = sourmash.load_one_signature(sig3, ksize=30, select_moltype='protein') - runtmp.sourmash('scripts', 'gbsketch', acc_csv, '--download-only', '--failed', failed, '-r', '1', '--fastas', out_dir, '--keep-fasta', '--checksum-fail', ch_fail, @@ -301,6 +292,7 @@ def test_gbsketch_download_only(runtmp, capfd): fa_files = os.listdir(out_dir) assert set(fa_files) == set(['GCA_000175535.1_genomic.fna.gz', 'GCA_000961135.2_protein.faa.gz', 'GCA_000961135.2_genomic.fna.gz']) captured = capfd.readouterr() + print(captured) assert "Failed to send signatures: channel closed" not in captured.err @@ -643,7 +635,6 @@ def test_gbsketch_simple_batched_multiple(runtmp, capfd): sig3 = get_test_data('GCA_000961135.2.protein.sig.gz') ss1 = sourmash.load_one_signature(sig1, ksize=31) ss2 = sourmash.load_one_signature(sig2, ksize=31) - # why does this need ksize =30 and not ksize = 10!??? ss3 = sourmash.load_one_signature(sig3, ksize=30, select_moltype='protein') runtmp.sourmash('scripts', 'gbsketch', acc_csv, '-o', output, @@ -791,7 +782,7 @@ def test_gbsketch_simple_batch_restart_with_incomplete_zip(runtmp, capfd): assert not os.path.exists(output) # for now, orig output file should be empty. captured = capfd.readouterr() print(captured.err) - assert f"Warning: Invalid zip file '{out2}'; skipping." in captured.err + assert f"Warning: Failed to load zip file '{out2}'; skipping." in captured.err # we created this one with sig cat idx = sourmash.load_file_as_index(out1) diff --git a/tests/test_urlsketch.py b/tests/test_urlsketch.py index a1d354c..18685ba 100644 --- a/tests/test_urlsketch.py +++ b/tests/test_urlsketch.py @@ -636,7 +636,7 @@ def test_urlsketch_simple_batch_restart_with_incomplete_zip(runtmp, capfd): assert not os.path.exists(output) # for now, orig output file should be empty. captured = capfd.readouterr() print(captured.err) - assert f"Warning: Invalid zip file '{out2}'; skipping." in captured.err + assert f"Warning: Failed to load zip file '{out2}'; skipping." in captured.err expected_siginfo = { (ss2.name, ss2.md5sum(), ss2.minhash.moltype),