Skip to content

Commit

Permalink
MRG: add generic support for any type of sketch collection as query o…
Browse files Browse the repository at this point in the history
…r database (#430)

* refactor & rename & consolidate

* remove 'lower'

* add cargo doc output for private fn

* add a few comments/docs

* switch to dev version of sourmash

* tracking

* cleaner

* cleanup

* load rocksdb natively

* foo

* cargo fmt

* upd

* upd

* fix fmt

* MRG: create `MultiCollection` for collections that span multiple files (#434)

* preliminary victory

* compiles and mostly runs

* cleanup, split to new module

* cleanup and comment

* more cleanup of diff

* cargo fmt

* fix fmt

* restore n_failed

* comment failing test

* cleanup and de-vec

* create module/submodule structure

* comment for later

* get rid of vec

* beg for help

* cleanup and doc

* clippy fixes

* compiling again

* cleanup

* bump sourmash to v0.15.1

* check if is rocksdb

* weird error

* use remove_unwrap branch of sourmash

* get index to work with MultiCollection

* old bug now fixed

* clippy, format, and fix

* make names clearer

* ditch MultiCollection for index, at least for now

* testy testy

* getting closer

* update sourmash

* mark failing tests

* upd

* cargo fmt

* MRG: test exit from `pairwise` and `multisearch` if no loaded sketches (#437)

* upd

* check for appropriate multisearch error exits

* add more tests for pairwise, too

* cargo fmt

* MRG: switch to more efficient use of `Collection` by removing cloning (#438)

* remove unnecessary clones by switch to references in SmallSignature

* switch away from references for collections => avoid clones

* remove MultiCollection::iter

* MRG: add tests for RocksDB/RevIndex, standalone manifests, and flexible pathlists (#436)

* test using rocksdb as source of sketches

* test file lists of zips

* cargo fmt

* hackity hack hack a picklist

* ok that makes more sense

* it works

* comments around future par_iter

* support loading from a .sig.gz for index

* test pairwise loading from rocksdb

* add test for queries from Rocksdb

* decide not to implement lists of manifests :)

* reenable and fix test_fastgather.py::test_indexed_against

* impl Deref for MultiCollection

* clippy

* switch to using load_sketches method

* deref doesn't actually make sense for MultiCollection

* update to latest sourmash code

* update to latest sourmash code

* simplify

* update to latest sourmash code

* remove unnecessary flag

* MRG: support & test loading of standalone manifests within pathlists (#450)

* use recursion to load paths into a MultiCollection => mf support

* MRG: clean up index to use `MultiCollection` (#451)

* try making index work with standard code

* kinda working

* fmt

* refactor

* clear up the tests

* refactor/clean up

* cargo fmt

* add tests for index warning & error

* comment

* MRG: documentation updates based on new collection loading (#444)

* update docs for #430

* upd documentation

* upd

* Update src/lib.rs

Co-authored-by: Tessa Pierce Ward <[email protected]>

* switch unwrap to expect

* move unwrap to expect

* minor cleanup

* cargo fmt

* provide legacy method to avoid xfail on index loading

* switch to using reference

* update docs to reflect pathlist behavior

* test recursive nature of MultiCollection

* re-enable test that is now passing

* update to latest sourmash

* upd sourmash

* update sourmash

* mut MultiCollection

* cleanup

* update after merge of sourmash-bio/sourmash#3305

* fix contains_revindex

* add trace commands for tracing loading

* use released version of sourmash

* add support for ignoring abundance

* cargo fmt

* avoid downsampling until we know there is overlap

* change downsample to true; add panic assertion

* move downsampling side guard

* eliminate redundant overlap check

* move calc_abund_stats

* extract abundance code into own function; avoid downsampling if poss

* cleanup

* fmt

* update to next sourmash release

* cargo fmt

* upd sourmash

* correct numbers

* upd sourmash

* upd sourmash

* upd sourmash

* upd sourmash

* use new try_into() and eliminate several clone()s

* refactor a bit more

* deallocate collection?

* upd sourmash

* cargo fmt

* fix merge foo

---------

Co-authored-by: Tessa Pierce Ward <[email protected]>
  • Loading branch information
ctb and bluegenes authored Oct 15, 2024
1 parent a7b5ae0 commit 02fcbc5
Show file tree
Hide file tree
Showing 26 changed files with 1,402 additions and 442 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustdocflags = ["--document-private-items"]
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ sourmash = { version = "0.16.0", features = ["branchwater"] }
serde_json = "1.0.128"
niffler = "2.4.0"
log = "0.4.22"
env_logger = { version = "0.11.5", optional = true }
env_logger = { version = "0.11.5" }
simple-error = "0.3.1"
anyhow = "1.0.89"
zip = { version = "2.0", default-features = false }
Expand Down
205 changes: 148 additions & 57 deletions doc/README.md

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ pub fn fastgather(
)
}
// get single query sig and minhash
let query_sig = query_collection.sig_for_dataset(0)?; // need this for original md5sum
let query_sig = query_collection.get_first_sig().expect("no queries!?");

// @CTB avoid clone?
let query_sig_ds = query_sig.clone().select(selection)?; // downsample
let query_mh = match query_sig_ds.minhash() {
Some(query_mh) => query_mh,
Expand Down Expand Up @@ -89,7 +91,17 @@ pub fn fastgather(
}

if prefetch_output.is_some() {
write_prefetch(&query_sig, prefetch_output, &matchlist).ok();
let query_filename = query_sig.filename();
let query_name = query_sig.name();
let query_md5 = query_sig.md5sum();
write_prefetch(
query_filename,
query_name,
query_md5,
prefetch_output,
&matchlist,
)
.ok();
}

// run the gather!
Expand Down
207 changes: 108 additions & 99 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use anyhow::Result;
use rayon::prelude::*;

use sourmash::prelude::ToWriter;
use sourmash::prelude::{Storage, ToWriter};
use sourmash::{selection::Selection, signature::SigsTrait};

use std::sync::atomic;
Expand All @@ -15,13 +15,14 @@ use camino::Utf8Path as PathBuf;
use std::collections::HashSet;
use std::fs::File;

use log::trace;

use sourmash::signature::Signature;
use sourmash::sketch::minhash::KmerMinHash;
use sourmash::sketch::Sketch;

use crate::utils::{
consume_query_by_gather, load_collection, load_sketches, write_prefetch, PrefetchResult,
ReportType,
consume_query_by_gather, load_collection, write_prefetch, PrefetchResult, ReportType,
};

pub fn fastmultigather(
Expand All @@ -34,6 +35,8 @@ pub fn fastmultigather(
save_matches: bool,
create_empty_results: bool,
) -> Result<()> {
let _ = env_logger::try_init();

// load query collection
let query_collection = load_collection(
&query_filepath,
Expand All @@ -50,8 +53,7 @@ pub fn fastmultigather(
1
}
}
.try_into()
.unwrap();
.try_into()?;

println!("threshold overlap: {} {}", threshold_hashes, threshold_bp);

Expand All @@ -63,123 +65,130 @@ pub fn fastmultigather(
allow_failed_sigpaths,
)?;
// load against sketches into memory, downsampling on the way
let against = load_sketches(against_collection, selection, ReportType::Against).unwrap();
let against = against_collection.load_sketches(selection)?;

// Iterate over all queries => do prefetch and gather!
let processed_queries = AtomicUsize::new(0);
let skipped_paths = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);

query_collection.par_iter().for_each(|(_idx, record)| {
query_collection.par_iter().for_each(|(c, _idx, record)| {
// increment counter of # of queries. q: could we instead use the _idx from par_iter(), or will it vary based on thread?
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
// Load query sig (downsampling happens here)
match query_collection.sig_from_record(record) {
trace!(
"fastmultigather query load: from:{} idx:{} loc:{}",
c.storage().spec(),
_idx,
record.internal_location()
);
match c.sig_from_record(record) {
Ok(query_sig) => {
let name = query_sig.name();
let prefix = name.split(' ').next().unwrap_or_default().to_string();
let location = PathBuf::new(&prefix).file_name().unwrap();
if let Some(query_mh) = query_sig.minhash() {
let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None };
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if overlap >= threshold_hashes {
if save_matches {
if let Ok(intersection) =
against.minhash.intersection(query_mh)
{
matching_hashes
.as_mut()
.unwrap()
.extend(intersection.0);
}
}
let result = PrefetchResult {
name: against.name.clone(),
md5sum: against.md5sum.clone(),
minhash: against.minhash.clone(),
location: against.location.clone(),
overlap,
};
mm = Some(result);
}
}
mm
})
.collect();
if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok();

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
scaled as u64,
matchlist,
threshold_hashes,
Some(gather_output),
)
.ok();

// Save matching hashes to .sig file if save_matches is true
if save_matches {
if let Some(hashes) = matching_hashes {
let sig_filename = format!("{}.matches.sig", name);
if let Ok(mut file) = File::create(&sig_filename) {
let unique_hashes: HashSet<u64> = hashes.into_iter().collect();
let mut new_mh = KmerMinHash::new(
query_mh.scaled().try_into().unwrap(),
query_mh.ksize().try_into().unwrap(),
query_mh.hash_function().clone(),
query_mh.seed(),
false,
query_mh.num(),
);
new_mh
.add_many(&unique_hashes.into_iter().collect::<Vec<_>>())
.ok();
let mut signature = Signature::default();
signature.push(Sketch::MinHash(new_mh));
signature.set_filename(&name);
if let Err(e) = signature.to_writer(&mut file) {
eprintln!("Error writing signature file: {}", e);
let query_filename = query_sig.filename();
let query_name = query_sig.name();
let query_md5 = query_sig.md5sum();

let query_mh = query_sig.minhash().expect("cannot get sketch");
let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None };
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if overlap >= threshold_hashes {
if save_matches {
if let Ok(intersection) = against.minhash.intersection(query_mh)
{
matching_hashes.as_mut().unwrap().extend(intersection.0);
}
} else {
eprintln!("Error creating signature file: {}", sig_filename);
}
let result = PrefetchResult {
name: against.name.clone(),
md5sum: against.md5sum.clone(),
minhash: against.minhash.clone(),
location: against.location.clone(),
overlap,
};
mm = Some(result);
}
}
} else {
println!("No matches to '{}'", location);
if create_empty_results {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);
// touch output files
match std::fs::File::create(&prefetch_output) {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to create empty prefetch output: {}", e)
mm
})
.collect();

if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(
query_filename,
query_name,
query_md5,
Some(prefetch_output),
&matchlist,
)
.ok();

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
scaled as u64,
matchlist,
threshold_hashes,
Some(gather_output),
)
.ok();

// Save matching hashes to .sig file if save_matches is true
if save_matches {
if let Some(hashes) = matching_hashes {
let sig_filename = format!("{}.matches.sig", name);
if let Ok(mut file) = File::create(&sig_filename) {
let unique_hashes: HashSet<u64> = hashes.into_iter().collect();
let mut new_mh = KmerMinHash::new(
query_mh.scaled(),
query_mh.ksize().try_into().unwrap(),
query_mh.hash_function().clone(),
query_mh.seed(),
false,
query_mh.num(),
);
new_mh
.add_many(&unique_hashes.into_iter().collect::<Vec<_>>())
.ok();
let mut signature = Signature::default();
signature.push(Sketch::MinHash(new_mh));
signature.set_filename(&name);
if let Err(e) = signature.to_writer(&mut file) {
eprintln!("Error writing signature file: {}", e);
}
}
match std::fs::File::create(&gather_output) {
Ok(_) => {}
Err(e) => eprintln!("Failed to create empty gather output: {}", e),
} else {
eprintln!("Error creating signature file: {}", sig_filename);
}
}
}
} else {
// different warning here? Could not load sig from record??
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
println!("No matches to '{}'", location);
if create_empty_results {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);
// touch output files
match std::fs::File::create(&prefetch_output) {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to create empty prefetch output: {}", e)
}
}
match std::fs::File::create(&gather_output) {
Ok(_) => {}
Err(e) => eprintln!("Failed to create empty gather output: {}", e),
}
}
}
}
Err(_) => {
Expand Down
54 changes: 42 additions & 12 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use sourmash::prelude::*;
use std::path::Path;

use crate::utils::{load_collection, ReportType};
use sourmash::collection::{Collection, CollectionSet};

pub fn index<P: AsRef<Path>>(
siglist: String,
Expand All @@ -13,24 +14,53 @@ pub fn index<P: AsRef<Path>>(
allow_failed_sigpaths: bool,
use_internal_storage: bool,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Loading siglist");
eprintln!("Loading sketches from {}", siglist);

let collection = load_collection(
let multi = match load_collection(
&siglist,
selection,
ReportType::General,
allow_failed_sigpaths,
)?;
) {
Ok(multi) => multi,
Err(err) => return Err(err.into()),
};
eprintln!("Found {} sketches total.", multi.len());

let mut index = RevIndex::create(
output.as_ref(),
collection.select(selection)?.try_into()?,
colors,
)?;
// Try to convert it into a Collection and then CollectionSet.
let collection = match Collection::try_from(multi.clone()) {
// conversion worked!
Ok(c) => {
let cs: CollectionSet = c.select(selection)?.try_into()?;
Ok(cs)
}
// conversion failed; can we/should we load it into memory?
Err(_) => {
if use_internal_storage {
eprintln!("WARNING: loading all sketches into memory in order to index.");
eprintln!("See 'index' documentation for details.");
let c: Collection = multi.load_all_sigs(selection)?;
let cs: CollectionSet = c.try_into()?;
Ok(cs)
} else {
Err(
anyhow::anyhow!("cannot index this type of collection with external storage")
.into(),
)
}
}
};

if use_internal_storage {
index.internalize_storage()?;
}
match collection {
Ok(collection) => {
eprintln!("Indexing {} sketches.", collection.len());
let mut index = RevIndex::create(output.as_ref(), collection, colors)?;

Ok(())
if use_internal_storage {
index.internalize_storage()?;
}
Ok(())
}
Err(e) => Err(e),
}
}
Loading

0 comments on commit 02fcbc5

Please sign in to comment.