Skip to content

Commit

Permalink
MRG: create MultiCollection for collections that span multiple files (
Browse files Browse the repository at this point in the history
#434)

* preliminary victory

* compiles and mostly runs

* cleanup, split to new module

* cleanup and comment

* more cleanup of diff

* cargo fmt

* fix fmt

* restore n_failed

* comment failing test

* cleanup and de-vec

* create module/submodule structure

* comment for later

* get rid of vec

* beg for help

* cleanup and doc
  • Loading branch information
ctb authored Aug 20, 2024
1 parent c311a69 commit 28b43d8
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 203 deletions.
2 changes: 1 addition & 1 deletion src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn fastgather(
)
}
// get single query sig and minhash
let query_sig = query_collection.sig_for_dataset(0)?; // need this for original md5sum
let query_sig = query_collection.get_first_sig().unwrap();
let query_sig_ds = query_sig.clone().select(selection)?; // downsample
let query_mh = match query_sig_ds.minhash() {
Some(query_mh) => query_mh,
Expand Down
4 changes: 2 additions & 2 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ pub fn fastmultigather(
let skipped_paths = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);

query_collection.par_iter().for_each(|(_idx, record)| {
query_collection.par_iter().for_each(|(c, _idx, record)| {
// increment counter of # of queries. q: could we instead use the _idx from par_iter(), or will it vary based on thread?
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
// Load query sig (downsampling happens here)
match query_collection.sig_from_record(record) {
match c.sig_from_record(record) {
Ok(query_sig) => {
let name = query_sig.name();
let prefix = name.split(' ').next().unwrap_or_default().to_string();
Expand Down
4 changes: 2 additions & 2 deletions src/manysearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn manysearch(

let send = against_collection
.par_iter()
.filter_map(|(_idx, record)| {
.filter_map(|(coll, _idx, record)| {
let i = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst);
if i % 1000 == 0 && i > 0 {
eprintln!("Processed {} search sigs", i);
Expand All @@ -67,7 +67,7 @@ pub fn manysearch(
let mut results = vec![];

// against downsampling happens here
match against_collection.sig_from_record(record) {
match coll.sig_from_record(record) {
Ok(against_sig) => {
if let Some(against_mh) = against_sig.minhash() {
for query in query_sketchlist.iter() {
Expand Down
4 changes: 2 additions & 2 deletions src/mastiff_manygather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ pub fn mastiff_manygather(

let send = query_collection
.par_iter()
.filter_map(|(_idx, record)| {
.filter_map(|(coll, _idx, record)| {
let threshold = threshold_bp / selection.scaled()? as usize;
let ksize = selection.ksize()?;

// query downsampling happens here
match query_collection.sig_from_record(record) {
match coll.sig_from_record(record) {
Ok(query_sig) => {
let mut results = vec![];
if let Some(query_mh) = query_sig.minhash() {
Expand Down
4 changes: 2 additions & 2 deletions src/mastiff_manysearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ pub fn mastiff_manysearch(

let send_result = query_collection
.par_iter()
.filter_map(|(_idx, record)| {
.filter_map(|(coll, _idx, record)| {
let i = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst);
if i % 1000 == 0 && i > 0 {
eprintln!("Processed {} search sigs", i);
}

let mut results = vec![];
// query downsample happens here
match query_collection.sig_from_record(record) {
match coll.sig_from_record(record) {
Ok(query_sig) => {
if let Some(query_mh) = query_sig.minhash() {
let query_size = query_mh.size();
Expand Down
3 changes: 2 additions & 1 deletion src/python/tests/test_fastgather.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,8 @@ def test_simple_hp(runtmp):


def test_indexed_against(runtmp, capfd):
# do not accept rocksdb for now
return
# do not accept rocksdb for now @CTB we do now!!
query = get_test_data('SRR606249.sig.gz')
against_list = runtmp.output('against.txt')

Expand Down
Loading

0 comments on commit 28b43d8

Please sign in to comment.