Skip to content

Commit

Permalink
sigh
Browse files Browse the repository at this point in the history
  • Loading branch information
ctb committed Nov 10, 2024
1 parent 2080ac3 commit 0f82329
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/fastmultigather_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use sourmash::signature::SigsTrait;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

use sourmash::sketch::minhash::KmerMinHash;
use sourmash::storage::SigStore;

use crate::utils::{
csvwriter_thread, is_revindex_database, load_collection, BranchwaterGatherResult, ReportType,
};
Expand All @@ -27,6 +30,8 @@ pub fn fastmultigather_rocksdb(
let db = RevIndex::open(index, true, None)?;
println!("Loaded DB");

let query_set_scaled = selection.scaled().expect("fiz");

let query_collection = load_collection(
&queries_file,
&selection,
Expand All @@ -50,6 +55,11 @@ pub fn fastmultigather_rocksdb(
let mut against_selection = selection;
against_selection.set_scaled(scaled);

let mut query_collection = query_collection;
if scaled != query_set_scaled {
query_collection = query_collection.select(&against_selection).expect("fiz");
}

// set up a multi-producer, single-consumer channel.
let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
Expand All @@ -71,6 +81,7 @@ pub fn fastmultigather_rocksdb(
let send = query_collection
.par_iter()
.filter_map(|(coll, _idx, record)| {
eprintln!("XXX record scaled: {}", record.scaled());
let threshold = threshold_bp / against_selection.scaled().expect("scaled is not set!?");
let ksize = against_selection.ksize().expect("ksize not set!?");

Expand All @@ -82,7 +93,13 @@ pub fn fastmultigather_rocksdb(
let query_md5 = query_sig.md5sum();

let mut results = vec![];
if let Ok(query_mh) = query_sig.try_into() {
if let Ok(query_mh) = <SigStore as TryInto<KmerMinHash>>::try_into(query_sig) {
eprintln!("selection: {}; query_mh scaled: {}",
query_set_scaled, query_mh.scaled());
let query_mh = query_mh.downsample_scaled(query_set_scaled).expect("bar");
eprintln!("XYZ selection: {}; query_mh scaled: {}",
query_set_scaled, query_mh.scaled());

let _ = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst);
// Gather!
let (counter, query_colors, hash_to_color) =
Expand Down

0 comments on commit 0f82329

Please sign in to comment.