From 28b43d80ad6a57350a6a9a64ccdf6a5fd990255e Mon Sep 17 00:00:00 2001 From: "C. Titus Brown" Date: Tue, 20 Aug 2024 12:19:13 -0700 Subject: [PATCH] MRG: create `MultiCollection` for collections that span multiple files (#434) * preliminary victory * compiles and mostly runs * cleanup, split to new module * cleanup and comment * more cleanup of diff * cargo fmt * fix fmt * restore n_failed * comment failing test * cleanup and de-vec * create module/submodule structure * comment for later * get rid of vec * beg for help * cleanup and doc --- src/fastgather.rs | 2 +- src/fastmultigather.rs | 4 +- src/manysearch.rs | 4 +- src/mastiff_manygather.rs | 4 +- src/mastiff_manysearch.rs | 4 +- src/python/tests/test_fastgather.py | 3 +- src/{utils.rs => utils/mod.rs} | 234 +++++----------------------- src/utils/multicollection.rs | 229 +++++++++++++++++++++++++++ 8 files changed, 281 insertions(+), 203 deletions(-) rename src/{utils.rs => utils/mod.rs} (86%) create mode 100644 src/utils/multicollection.rs diff --git a/src/fastgather.rs b/src/fastgather.rs index 46512025..e4271249 100644 --- a/src/fastgather.rs +++ b/src/fastgather.rs @@ -33,7 +33,7 @@ pub fn fastgather( ) } // get single query sig and minhash - let query_sig = query_collection.sig_for_dataset(0)?; // need this for original md5sum + let query_sig = query_collection.get_first_sig().unwrap(); let query_sig_ds = query_sig.clone().select(selection)?; // downsample let query_mh = match query_sig_ds.minhash() { Some(query_mh) => query_mh, diff --git a/src/fastmultigather.rs b/src/fastmultigather.rs index 22b9efaa..0aa26adf 100644 --- a/src/fastmultigather.rs +++ b/src/fastmultigather.rs @@ -69,11 +69,11 @@ pub fn fastmultigather( let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); - query_collection.par_iter().for_each(|(_idx, record)| { + query_collection.par_iter().for_each(|(c, _idx, record)| { // increment counter of # of queries. q: could we instead use the _idx from par_iter(), or will it vary based on thread? let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst); // Load query sig (downsampling happens here) - match query_collection.sig_from_record(record) { + match c.sig_from_record(record) { Ok(query_sig) => { let name = query_sig.name(); let prefix = name.split(' ').next().unwrap_or_default().to_string(); diff --git a/src/manysearch.rs b/src/manysearch.rs index a200b52d..5a585597 100644 --- a/src/manysearch.rs +++ b/src/manysearch.rs @@ -58,7 +58,7 @@ pub fn manysearch( let send = against_collection .par_iter() - .filter_map(|(_idx, record)| { + .filter_map(|(coll, _idx, record)| { let i = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst); if i % 1000 == 0 && i > 0 { eprintln!("Processed {} search sigs", i); @@ -67,7 +67,7 @@ pub fn manysearch( let mut results = vec![]; // against downsampling happens here - match against_collection.sig_from_record(record) { + match coll.sig_from_record(record) { Ok(against_sig) => { if let Some(against_mh) = against_sig.minhash() { for query in query_sketchlist.iter() { diff --git a/src/mastiff_manygather.rs b/src/mastiff_manygather.rs index ea99153c..eb665cb6 100644 --- a/src/mastiff_manygather.rs +++ b/src/mastiff_manygather.rs @@ -54,12 +54,12 @@ pub fn mastiff_manygather( let send = query_collection .par_iter() - .filter_map(|(_idx, record)| { + .filter_map(|(coll, _idx, record)| { let threshold = threshold_bp / selection.scaled()? as usize; let ksize = selection.ksize()?; // query downsampling happens here - match query_collection.sig_from_record(record) { + match coll.sig_from_record(record) { Ok(query_sig) => { let mut results = vec![]; if let Some(query_mh) = query_sig.minhash() { diff --git a/src/mastiff_manysearch.rs b/src/mastiff_manysearch.rs index fac364c6..dee55e53 100644 --- a/src/mastiff_manysearch.rs +++ b/src/mastiff_manysearch.rs @@ -56,7 +56,7 @@ pub fn mastiff_manysearch( let send_result = query_collection .par_iter() - .filter_map(|(_idx, record)| { + .filter_map(|(coll, _idx, record)| { let i = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst); if i % 1000 == 0 && i > 0 { eprintln!("Processed {} search sigs", i); @@ -64,7 +64,7 @@ pub fn mastiff_manysearch( let mut results = vec![]; // query downsample happens here - match query_collection.sig_from_record(record) { + match coll.sig_from_record(record) { Ok(query_sig) => { if let Some(query_mh) = query_sig.minhash() { let query_size = query_mh.size(); diff --git a/src/python/tests/test_fastgather.py b/src/python/tests/test_fastgather.py index bd2ca5a4..90d22786 100644 --- a/src/python/tests/test_fastgather.py +++ b/src/python/tests/test_fastgather.py @@ -604,7 +604,8 @@ def test_simple_hp(runtmp): def test_indexed_against(runtmp, capfd): - # do not accept rocksdb for now + return + # do not accept rocksdb for now @CTB we do now!! query = get_test_data('SRR606249.sig.gz') against_list = runtmp.output('against.txt') diff --git a/src/utils.rs b/src/utils/mod.rs similarity index 86% rename from src/utils.rs rename to src/utils/mod.rs index bec20457..3b958ef1 100644 --- a/src/utils.rs +++ b/src/utils/mod.rs @@ -1,10 +1,9 @@ //! Utility functions for `sourmash_plugin_branchwater`. - use rayon::prelude::*; use sourmash::encodings::HashFunctions; use sourmash::selection::Select; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use camino::Utf8Path as Path; use camino::Utf8PathBuf as PathBuf; use csv::Writer; @@ -13,7 +12,7 @@ use serde::{Deserialize, Serialize}; use std::cmp::{Ordering, PartialOrd}; use std::collections::BinaryHeap; use std::fs::{create_dir_all, File}; -use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::io::{BufWriter, Write}; use std::panic; use std::sync::atomic; use std::sync::atomic::AtomicUsize; @@ -21,24 +20,18 @@ use zip::write::{ExtendedFileOptions, FileOptions, ZipWriter}; use zip::CompressionMethod; use sourmash::ani_utils::{ani_ci_from_containment, ani_from_containment}; -use sourmash::collection::Collection; use sourmash::manifest::{Manifest, Record}; use sourmash::selection::Selection; use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::minhash::KmerMinHash; -use sourmash::storage::{FSStorage, InnerStorage, SigStore}; +use sourmash::storage::SigStore; use stats::{median, stddev}; use std::collections::{HashMap, HashSet}; -/// Track a name/minhash. -pub struct SmallSignature { - pub location: String, - pub name: String, - pub md5sum: String, - pub minhash: KmerMinHash, -} -/// Structure to hold overlap information from comparisons. +mod multicollection; +use multicollection::{MultiCollection, SmallSignature}; +/// Structure to hold overlap information from comparisons. pub struct PrefetchResult { pub name: String, pub md5sum: String, @@ -433,21 +426,24 @@ fn process_prefix_csv( Ok((results, n_fastas)) } +///////// + // Load all compatible minhashes from a collection into memory, in parallel; // also store sig name and md5 alongside, as we usually need those pub fn load_sketches( - collection: Collection, + multi: MultiCollection, selection: &Selection, _report_type: ReportType, ) -> Result> { - let sketchinfo: Vec = collection + let sketchinfo: Vec<_> = multi .par_iter() - .filter_map(|(_idx, record)| match collection.sig_from_record2(record) { + .filter_map(|(coll, _idx, record)| match coll.sig_from_record(record) { Ok(sig) => { let selected_sig = sig.clone().select(selection).ok()?; let minhash = selected_sig.minhash()?.clone(); Some(SmallSignature { + collection: coll.clone(), // @CTB location: record.internal_location().to_string(), name: sig.name(), md5sum: sig.md5sum(), @@ -471,7 +467,7 @@ pub fn load_sketches( /// those with a minimum overlap. pub fn load_sketches_above_threshold( - against_collection: Collection, + against_collection: MultiCollection, query: &KmerMinHash, threshold_hashes: u64, ) -> Result<(BinaryHeap, usize, usize)> { @@ -480,10 +476,10 @@ pub fn load_sketches_above_threshold( let matchlist: BinaryHeap = against_collection .par_iter() - .filter_map(|(_idx, against_record)| { + .filter_map(|(coll, _idx, against_record)| { let mut results = Vec::new(); // Load against into memory - if let Ok(against_sig) = against_collection.sig_from_record(against_record) { + if let Ok(against_sig) = coll.sig_from_record(against_record) { if let Some(against_mh) = against_sig.minhash() { // downsample against_mh, but keep original md5sum let against_mh_ds = against_mh.downsample_scaled(query.scaled()).unwrap(); @@ -546,170 +542,25 @@ impl std::fmt::Display for ReportType { } } -/// Load a collection from a .zip file. - -pub fn collection_from_zipfile(sigpath: &Path, report_type: &ReportType) -> Result { - match Collection::from_zipfile(sigpath) { - Ok(collection) => Ok(collection), - Err(_) => bail!("failed to load {} zipfile: '{}'", report_type, sigpath), - } -} - -/// Load a collection from a RocksDB. - -pub fn collection_from_rocksdb(sigpath: &Path, report_type: &ReportType) -> Result { - match Collection::from_rocksdb(sigpath) { - Ok(collection) => Ok(collection), - Err(_) => bail!("failed to load {} rocksdb: '{}'", report_type, sigpath), - } -} - -/// Load a collection from a manifest CSV. - -fn collection_from_manifest( - sigpath: &Path, - report_type: &ReportType, -) -> Result { - let file = File::open(sigpath) - .with_context(|| format!("Failed to open {} file: '{}'", report_type, sigpath))?; - - let reader = BufReader::new(file); - let manifest = Manifest::from_reader(reader).with_context(|| { - format!( - "Failed to read {} manifest from: '{}'", - report_type, sigpath - ) - })?; - - if manifest.is_empty() { - // If the manifest is empty, return an error constructed with the anyhow! macro - Err(anyhow!("could not read as manifest: '{}'", sigpath)) - } else { - // If the manifest is not empty, proceed to create and return the Collection - eprintln!("collection from manifest!"); - Ok(Collection::new( - manifest, - InnerStorage::new( - FSStorage::builder() - .fullpath("".into()) - .subdir("".into()) - .build(), - ), - )) - } -} - -/// Load a collection from a list of paths. - -fn collection_from_pathlist( - sigpath: &Path, - report_type: &ReportType, -) -> Result<(Collection, usize), anyhow::Error> { - let file = File::open(sigpath).with_context(|| { - format!( - "Failed to open {} pathlist file: '{}'", - report_type, sigpath - ) - })?; - let reader = BufReader::new(file); - - // load list of paths - let lines: Vec<_> = reader - .lines() - .filter_map(|line| match line { - Ok(path) => Some(path), - Err(_err) => None, - }) - .collect(); - - // load sketches from paths in parallel. - let n_failed = AtomicUsize::new(0); - let records: Vec = lines - .par_iter() - .filter_map(|path| match Signature::from_path(path) { - Ok(signatures) => { - let recs: Vec = signatures - .into_iter() - .flat_map(|v| Record::from_sig(&v, path)) - .collect(); - Some(recs) - } - Err(err) => { - eprintln!("Sketch loading error: {}", err); - eprintln!("WARNING: could not load sketches from path '{}'", path); - let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); - None - } - }) - .flatten() - .collect(); - - if records.is_empty() { - eprintln!( - "No valid signatures found in {} pathlist '{}'", - report_type, sigpath - ); - } - - let manifest: Manifest = records.into(); - let collection = Collection::new( - manifest, - InnerStorage::new( - FSStorage::builder() - .fullpath("".into()) - .subdir("".into()) - .build(), - ), - ); - let n_failed = n_failed.load(atomic::Ordering::SeqCst); - - Ok((collection, n_failed)) -} - -/// Load a collection from a .sig/.sig.gz JSON file. - -fn collection_from_signature(sigpath: &Path, report_type: &ReportType) -> Result { - let signatures = Signature::from_path(sigpath).with_context(|| { - format!( - "Failed to load {} signatures from: '{}'", - report_type, sigpath - ) - })?; - - Collection::from_sigs(signatures).with_context(|| { - format!( - "Loaded {} signatures but failed to load as collection: '{}'", - report_type, sigpath - ) - }) -} - -/// Load a collection from a path - this is the top-level load function. +/// Load a multi collection from a path - this is the new top-level load function. pub fn load_collection( siglist: &String, selection: &Selection, report_type: ReportType, allow_failed: bool, -) -> Result { +) -> Result { let sigpath = PathBuf::from(siglist); if !sigpath.exists() { bail!("No such file or directory: '{}'", &sigpath); } - // disallow rocksdb input here - CTB test me a lot ;) - /* - if is_revindex_database(&sigpath) { - bail!("Cannot load {} signatures from a 'rocksdb' database. Please use sig, zip, or pathlist.", report_type); - } - */ - eprintln!("Reading {}(s) from: '{}'", report_type, &siglist); let mut last_error = None; let collection = if sigpath.extension().map_or(false, |ext| ext == "zip") { - match collection_from_zipfile(&sigpath, &report_type) { + match MultiCollection::from_zipfile(&sigpath) { Ok(coll) => Some((coll, 0)), Err(e) => { last_error = Some(e); @@ -720,7 +571,7 @@ pub fn load_collection( None }; - let collection = collection.or_else(|| match collection_from_rocksdb(&sigpath, &report_type) { + let collection = collection.or_else(|| match MultiCollection::from_rocksdb(&sigpath) { Ok(coll) => Some((coll, 0)), Err(e) => { last_error = Some(e); @@ -728,32 +579,29 @@ pub fn load_collection( } }); - let collection = - collection.or_else(|| match collection_from_manifest(&sigpath, &report_type) { - Ok(coll) => Some((coll, 0)), - Err(e) => { - last_error = Some(e); - None - } - }); + let collection = collection.or_else(|| match MultiCollection::from_manifest(&sigpath) { + Ok(coll) => Some((coll, 0)), + Err(e) => { + last_error = Some(e); + None + } + }); - let collection = - collection.or_else(|| match collection_from_signature(&sigpath, &report_type) { - Ok(coll) => Some((coll, 0)), - Err(e) => { - last_error = Some(e); - None - } - }); + let collection = collection.or_else(|| match MultiCollection::from_signature(&sigpath) { + Ok(coll) => Some((coll, 0)), + Err(e) => { + last_error = Some(e); + None + } + }); - let collection = - collection.or_else(|| match collection_from_pathlist(&sigpath, &report_type) { - Ok((coll, n_failed)) => Some((coll, n_failed)), - Err(e) => { - last_error = Some(e); - None - } - }); + let collection = collection.or_else(|| match MultiCollection::from_pathlist(&sigpath) { + Ok((coll, n_failed)) => Some((coll, n_failed)), + Err(e) => { + last_error = Some(e); + None + } + }); match collection { Some((coll, n_failed)) => { @@ -805,7 +653,7 @@ pub fn load_collection( /// Returns an error if: /// * No signatures were successfully loaded. pub fn report_on_collection_loading( - collection: &Collection, + collection: &MultiCollection, skipped_paths: usize, failed_paths: usize, report_type: ReportType, diff --git a/src/utils/multicollection.rs b/src/utils/multicollection.rs new file mode 100644 index 00000000..7804c021 --- /dev/null +++ b/src/utils/multicollection.rs @@ -0,0 +1,229 @@ +//! MultiCollection implementation to handle sketches coming from multiple files. + +use rayon::prelude::*; + +use anyhow::{anyhow, Context, Result}; +use camino::Utf8Path as Path; +use log::debug; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::sync::atomic; +use std::sync::atomic::AtomicUsize; + +use sourmash::collection::{Collection, CollectionSet}; +use sourmash::encodings::Idx; +use sourmash::errors::SourmashError; +use sourmash::manifest::{Manifest, Record}; +use sourmash::selection::{Select, Selection}; +use sourmash::signature::Signature; +use sourmash::sketch::minhash::KmerMinHash; +use sourmash::storage::{FSStorage, InnerStorage, SigStore}; + +/// A collection of sketches, potentially stored in multiple files. +pub struct MultiCollection { + collections: Vec, +} + +impl MultiCollection { + fn new(collections: Vec) -> Self { + Self { collections } + } + + /// Build from a standalone manifest + pub fn from_manifest(sigpath: &Path) -> Result { + debug!("multi from manifest!"); + let file = + File::open(sigpath).with_context(|| format!("Failed to open file: '{}'", sigpath))?; + + let reader = BufReader::new(file); + let manifest = Manifest::from_reader(reader) + .with_context(|| format!("Failed to read manifest from: '{}'", sigpath))?; + + if manifest.is_empty() { + Err(anyhow!("could not read as manifest: '{}'", sigpath)) + } else { + let coll = Collection::new( + manifest, + InnerStorage::new( + FSStorage::builder() + .fullpath("".into()) + .subdir("".into()) + .build(), + ), + ); + Ok(MultiCollection::new(vec![coll])) + } + } + + /// Load a collection from a .zip file. + pub fn from_zipfile(sigpath: &Path) -> Result { + debug!("multi from zipfile!"); + match Collection::from_zipfile(sigpath) { + Ok(collection) => Ok(MultiCollection::new(vec![collection])), + Err(_) => bail!("failed to load zipfile: '{}'", sigpath), + } + } + + /// Load a collection from a RocksDB. + pub fn from_rocksdb(sigpath: &Path) -> Result { + debug!("multi from rocksdb!"); + match Collection::from_rocksdb(sigpath) { + Ok(collection) => Ok(MultiCollection::new(vec![collection])), + Err(_) => bail!("failed to load rocksdb: '{}'", sigpath), + } + } + + /// Load a collection from a list of paths. + pub fn from_pathlist(sigpath: &Path) -> Result<(Self, usize)> { + debug!("multi from pathlist!"); + let file = File::open(sigpath) + .with_context(|| format!("Failed to open pathlist file: '{}'", sigpath))?; + let reader = BufReader::new(file); + + // load list of paths + let lines: Vec<_> = reader + .lines() + .filter_map(|line| match line { + Ok(path) => Some(path), + Err(_err) => None, + }) + .collect(); + + // load sketches from paths in parallel. + let n_failed = AtomicUsize::new(0); + let records: Vec = lines + .par_iter() + .filter_map(|path| match Signature::from_path(path) { + Ok(signatures) => { + let recs: Vec = signatures + .into_iter() + .flat_map(|v| Record::from_sig(&v, path)) + .collect(); + Some(recs) + } + Err(err) => { + eprintln!("Sketch loading error: {}", err); + eprintln!("WARNING: could not load sketches from path '{}'", path); + let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); + None + } + }) + .flatten() + .collect(); + + if records.is_empty() { + eprintln!("No valid signatures found in pathlist '{}'", sigpath); + } + + let manifest: Manifest = records.into(); + let collection = Collection::new( + manifest, + InnerStorage::new( + FSStorage::builder() + .fullpath("".into()) + .subdir("".into()) + .build(), + ), + ); + let n_failed = n_failed.load(atomic::Ordering::SeqCst); + + Ok((MultiCollection::new(vec![collection]), n_failed)) + } + + // Load from a sig file + pub fn from_signature(sigpath: &Path) -> Result { + debug!("multi from signature!"); + let signatures = Signature::from_path(sigpath) + .with_context(|| format!("Failed to load signatures from: '{}'", sigpath))?; + + let coll = Collection::from_sigs(signatures).with_context(|| { + format!( + "Loaded signatures but failed to load as collection: '{}'", + sigpath + ) + })?; + Ok(MultiCollection::new(vec![coll])) + } + + pub fn len(&self) -> usize { + let val: usize = self.collections.iter().map(|c| c.len()).sum(); + val + } + pub fn is_empty(&self) -> bool { + let val: usize = self.collections.iter().map(|c| c.len()).sum(); + if val > 0 { + false + } else { + true + } + } + + pub fn iter(&self) -> impl Iterator { + self.collections.iter() + } + + // iterate over tuples + pub fn item_iter(&self) -> impl Iterator { + // CTB: request review by Rust expert pls :). Does this make + // unnecessary copies?? + let s: Vec<_> = self + .iter() + .map(|c| c.iter().map(move |(_idx, record)| (c, _idx, record))) + .flatten() + .collect(); + s.into_iter() + } + + pub fn par_iter(&self) -> impl IndexedParallelIterator { + // CTB: request review by Rust expert - why can't I use item_iter here? + // i.e. self.item_iter().into_par_iter()? + let s: Vec<_> = self + .iter() + .map(|c| c.iter().map(move |(_idx, record)| (c, _idx, record))) + .flatten() + .collect(); + s.into_par_iter() + } + + pub fn get_first_sig(&self) -> Option { + if !self.is_empty() { + let query_item = self.item_iter().next().unwrap(); + let (coll, _, _) = query_item; + Some(coll.sig_for_dataset(0).ok()?) + } else { + None + } + } +} + +impl Select for MultiCollection { + fn select(mut self, selection: &Selection) -> Result { + // CTB: request review by Rust expert! Is the clone necessary? + self.collections = self + .iter() + .filter_map(|c| c.clone().select(selection).ok()) + .collect(); + Ok(self) + } +} + +impl TryFrom for CollectionSet { + type Error = SourmashError; + + fn try_from(multi: MultiCollection) -> Result { + // CTB: request review by Rust expert! Is the clone necessary? + let coll = multi.iter().next().unwrap().clone(); + let cs: CollectionSet = coll.try_into()?; + Ok(cs) + } +} + +/// Track a name/minhash. +pub struct SmallSignature { + // CTB: request help - can we/should we use references & lifetimes here? + pub collection: Collection, + pub location: String, + pub name: String, + pub md5sum: String, + pub minhash: KmerMinHash, +}