diff --git a/src/core/src/index/revindex/disk_revindex.rs b/src/core/src/index/revindex/disk_revindex.rs index 1a81a62086..cfb75f860e 100644 --- a/src/core/src/index/revindex/disk_revindex.rs +++ b/src/core/src/index/revindex/disk_revindex.rs @@ -281,7 +281,7 @@ impl RevIndexOps for RevIndex { let hashes_iter = query.iter_mins().map(|hash| { let mut v = vec![0_u8; 8]; (&mut v[..]) - .write_u64::(*hash) + .write_u64::(hash) .expect("error writing bytes"); (&cf_hashes, v) }); @@ -306,7 +306,7 @@ impl RevIndexOps for RevIndex { let hashes_iter = query.iter_mins().map(|hash| { let mut v = vec![0_u8; 8]; (&mut v[..]) - .write_u64::(*hash) + .write_u64::(hash) .expect("error writing bytes"); (&cf_hashes, v) }); @@ -332,7 +332,7 @@ impl RevIndexOps for RevIndex { .entry(color) .or_insert_with(|| new_vals.clone()); counter.update(new_vals); - (*k, color) + (k, color) }) }) .collect(); @@ -449,7 +449,7 @@ impl RevIndexOps for RevIndex { // Prepare counter for finding the next match by decrementing // all hashes found in the current match in other datasets // TODO: not used at the moment, so just skip. - query.remove_many(match_mh.iter_mins().copied())?; // is there a better way? + query.remove_many(match_mh.iter_mins())?; // is there a better way? // TODO: Use HashesToColors here instead. If not initialized, // build it. diff --git a/src/core/src/index/revindex/mem_revindex.rs b/src/core/src/index/revindex/mem_revindex.rs index 08b7bc56ac..85f86a246f 100644 --- a/src/core/src/index/revindex/mem_revindex.rs +++ b/src/core/src/index/revindex/mem_revindex.rs @@ -219,7 +219,7 @@ impl RevIndex { // Prepare counter for finding the next match by decrementing // all hashes found in the current match in other datasets for hash in match_mh.iter_mins() { - if let Some(color) = self.hash_to_color.get(hash) { + if let Some(color) = self.hash_to_color.get(&hash) { counter.subtract(self.colors.indices(color).cloned()); } } @@ -292,7 +292,7 @@ impl RevIndex { pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { query .iter_mins() - .filter_map(|hash| self.hash_to_color.get(hash)) + .filter_map(|hash| self.hash_to_color.get(&hash)) .flat_map(|color| self.colors.indices(color)) .cloned() .collect() diff --git a/src/core/src/sketch/minhash.rs b/src/core/src/sketch/minhash.rs index 438294e098..0fce9cb16d 100644 --- a/src/core/src/sketch/minhash.rs +++ b/src/core/src/sketch/minhash.rs @@ -7,6 +7,7 @@ use std::str; use std::sync::Mutex; use itertools::Itertools; +use roaring::RoaringTreemap; use serde::de::Deserializer; use serde::ser::{SerializeStruct, Serializer}; use serde::{Deserialize, Serialize}; @@ -52,7 +53,7 @@ pub struct KmerMinHash { max_hash: u64, #[builder(default)] - mins: Vec, + mins: RoaringTreemap, #[builder(default)] abunds: Option>, @@ -93,7 +94,7 @@ impl Default for KmerMinHash { hash_function: HashFunctions::Murmur64Dna, seed: 42, max_hash: 0, - mins: Vec::with_capacity(1000), + mins: Default::default(), abunds: None, md5sum: Mutex::new(None), } @@ -115,7 +116,7 @@ impl Serialize for KmerMinHash { partial.serialize_field("ksize", &self.ksize)?; partial.serialize_field("seed", &self.seed)?; partial.serialize_field("max_hash", &self.max_hash)?; - partial.serialize_field("mins", &self.mins)?; + partial.serialize_field("mins", &self.mins.iter().collect::>())?; partial.serialize_field("md5sum", &self.md5sum())?; if let Some(abunds) = &self.abunds { @@ -156,18 +157,18 @@ impl<'de> Deserialize<'de> for KmerMinHash { _ => unimplemented!(), // TODO: throw error here }; + let mut mins = RoaringTreemap::new(); // This shouldn't be necessary, but at some point we // created signatures with unordered mins =( let (mins, abunds) = if let Some(abunds) = tmpsig.abundances { let mut values: Vec<(_, _)> = tmpsig.mins.iter().zip(abunds.iter()).collect(); values.sort(); - let mins = values.iter().map(|(v, _)| **v).collect(); + mins.extend(values.iter().map(|(v, _)| **v)); let abunds = values.iter().map(|(_, v)| **v).collect(); (mins, Some(abunds)) } else { - let mut values: Vec<_> = tmpsig.mins.into_iter().collect(); - values.sort_unstable(); - (values, None) + mins.extend(tmpsig.mins.into_iter()); + (mins, None) }; Ok(KmerMinHash { @@ -192,14 +193,10 @@ impl KmerMinHash { track_abundance: bool, num: u32, ) -> KmerMinHash { - let mins = if num > 0 { - Vec::with_capacity(num as usize) - } else { - Vec::with_capacity(1000) - }; + let mins = RoaringTreemap::new(); let abunds = if track_abundance { - Some(Vec::with_capacity(mins.capacity())) + Some(Vec::with_capacity(1000)) } else { None }; @@ -311,10 +308,7 @@ impl KmerMinHash { } pub fn add_hash_with_abundance(&mut self, hash: u64, abundance: u64) { - let current_max = match self.mins.last() { - Some(&x) => x, - None => u64::MAX, - }; + let current_max = self.mins.max().unwrap_or_else(|| u64::MAX); if hash > self.max_hash && self.max_hash != 0 { // This is a scaled minhash, and we don't need to add the new hash @@ -346,54 +340,56 @@ impl KmerMinHash { if hash <= self.max_hash || hash <= current_max || (self.mins.len() as u32) < self.num { // "good" hash - within range, smaller than current entry, or // still have space available - let pos = match self.mins.binary_search(&hash) { - Ok(p) => p, - Err(p) => p, - }; + let pos = self.mins.rank(hash) as usize; - if pos == self.mins.len() { - // at end - must still be growing, we know the list won't - // get too long - self.mins.push(hash); - self.reset_md5sum(); + //dbg!((hash, pos, &self.mins, &self.abunds)); + if self.mins.contains(hash) { if let Some(ref mut abunds) = self.abunds { - abunds.push(abundance); + //dbg!("bump abundance"); + // pos == hash: hash value already in mins, inc count by abundance + abunds[pos - 1] += abundance; } - } else if self.mins[pos] != hash { + } else { // didn't find hash in mins, so inserting somewhere // in the middle; shrink list if needed. - self.mins.insert(pos, hash); + //dbg!("not contains"); + self.mins.insert(hash); if let Some(ref mut abunds) = self.abunds { abunds.insert(pos, abundance); } // is it too big now? - if self.num != 0 && self.mins.len() > (self.num as usize) { - self.mins.pop(); + if self.num != 0 && self.mins.len() > self.num.into() { + self.mins.remove(self.mins.max().unwrap()); if let Some(ref mut abunds) = self.abunds { abunds.pop(); } } self.reset_md5sum(); - } else if let Some(ref mut abunds) = self.abunds { - // pos == hash: hash value already in mins, inc count by abundance - abunds[pos] += abundance; } - } - } - pub fn set_hash_with_abundance(&mut self, hash: u64, abundance: u64) { - let mut found = false; - if let Ok(pos) = self.mins.binary_search(&hash) { - if self.mins[pos] == hash { - found = true; + /* + if pos == self.mins.len() as usize { + // at end - must still be growing, we know the list won't + // get too long + dbg!("equal"); + self.mins.push(hash); + self.reset_md5sum(); if let Some(ref mut abunds) = self.abunds { - abunds[pos] = abundance; + abunds.push(abundance); } } + */ } + } - if !found { + pub fn set_hash_with_abundance(&mut self, hash: u64, abundance: u64) { + let pos = self.mins.rank(hash) as usize; + if self.mins.contains(hash) { + if let Some(ref mut abunds) = self.abunds { + abunds[pos] = abundance; + } + } else { self.add_hash_with_abundance(hash, abundance); } } @@ -404,20 +400,19 @@ impl KmerMinHash { } pub fn remove_hash(&mut self, hash: u64) { - if let Ok(pos) = self.mins.binary_search(&hash) { - if self.mins[pos] == hash { - self.mins.remove(pos); - self.reset_md5sum(); - if let Some(ref mut abunds) = self.abunds { - abunds.remove(pos); - } + let pos = self.mins.rank(hash); + if self.mins.contains(hash) { + self.mins.remove(hash); + self.reset_md5sum(); + if let Some(ref mut abunds) = self.abunds { + abunds.remove((pos - 1) as usize); } - }; + } } pub fn remove_from(&mut self, other: &KmerMinHash) -> Result<(), Error> { for min in &other.mins { - self.remove_hash(*min); + self.remove_hash(min); } Ok(()) } @@ -431,9 +426,9 @@ impl KmerMinHash { pub fn merge(&mut self, other: &KmerMinHash) -> Result<(), Error> { self.check_compatible(other)?; - let max_size = self.mins.len() + other.mins.len(); + let max_size = (self.mins.len() + other.mins.len() - self.mins.intersection_len(&other.mins)) as usize; - let mut merged: Vec = Vec::with_capacity(max_size); + let mut merged: RoaringTreemap = Default::default(); let mut merged_abunds: Option> = if self.abunds.is_some() && other.abunds.is_some() { Some(Vec::with_capacity(max_size)) @@ -453,7 +448,7 @@ impl KmerMinHash { let value = self_value.unwrap(); match other_value { None => { - merged.push(*value); + merged.push(value); merged.extend(self_iter); if let Some(v) = merged_abunds.as_mut() { v.extend(self_abunds_iter) @@ -461,7 +456,7 @@ impl KmerMinHash { break; } Some(x) if x < value => { - merged.push(*x); + merged.push(x); other_value = other_iter.next(); if let Some(v) = other_abunds_iter.next() { if let Some(n) = merged_abunds.as_mut() { @@ -470,7 +465,7 @@ impl KmerMinHash { } } Some(x) if x == value => { - merged.push(*x); + merged.push(x); other_value = other_iter.next(); self_value = self_iter.next(); @@ -482,7 +477,7 @@ impl KmerMinHash { } } Some(x) if x > value => { - merged.push(*value); + merged.push(value); self_value = self_iter.next(); if let Some(v) = self_abunds_iter.next() { @@ -495,19 +490,21 @@ impl KmerMinHash { } } if let Some(value) = other_value { - merged.push(*value); + merged.push(value); } merged.extend(other_iter); if let Some(n) = merged_abunds.as_mut() { n.extend(other_abunds_iter) } - if merged.len() > (self.num as usize) && (self.num as usize) != 0 { - merged.truncate(self.num as usize); + if merged.len() > (self.num as u64) && self.num != 0 { + let last_pos = merged.select(self.num as u64 - 1).unwrap_or_else(|| u64::MAX); + merged.remove_range(last_pos + 1..); if let Some(v) = merged_abunds.as_mut() { v.truncate(self.num as usize) } } + assert_eq!(merged.len() as usize, merged_abunds.as_ref().map(|v| v.len()).unwrap_or_else(|| merged.len() as usize)); self.mins = merged; self.abunds = merged_abunds; @@ -517,7 +514,7 @@ impl KmerMinHash { pub fn add_from(&mut self, other: &KmerMinHash) -> Result<(), Error> { for min in &other.mins { - self.add_hash(*min); + self.add_hash(min); } Ok(()) } @@ -547,13 +544,13 @@ impl KmerMinHash { first.count_common(&downsampled_mh, false) } else { self.check_compatible(other)?; - let iter = if self.size() < other.size() { - Intersection::new(self.mins.iter(), other.mins.iter()) + let size = if self.size() < other.size() { + self.mins.intersection_len(&other.mins) } else { - Intersection::new(other.mins.iter(), self.mins.iter()) + other.mins.intersection_len(&self.mins) }; - Ok(iter.count() as u64) + Ok(size as u64) } } @@ -574,17 +571,18 @@ impl KmerMinHash { combined_mh.merge(self)?; combined_mh.merge(other)?; - let it1 = Intersection::new(self.mins.iter(), other.mins.iter()); + let it1 = Intersection::new(self.iter_mins(), other.iter_mins()); // TODO: there is probably a way to avoid this Vec here, // and pass the it1 as left in it2. - let i1: Vec = it1.cloned().collect(); - let it2 = Intersection::new(i1.iter(), combined_mh.mins.iter()); + let i1: Vec = it1.collect(); + let cmh_mins = combined_mh.mins(); + let it2 = Intersection::new(i1.iter(), cmh_mins.iter()); let common: Vec = it2.cloned().collect(); Ok((common, combined_mh.mins.len() as u64)) } else { - Ok(intersection(self.mins.iter(), other.mins.iter())) + Ok(intersection(self.iter_mins(), other.iter_mins())) } } @@ -611,12 +609,14 @@ impl KmerMinHash { // TODO: there is probably a way to avoid this Vec here, // and pass the it1 as left in it2. - let i1: Vec = it1.cloned().collect(); - let it2 = Intersection::new(i1.iter(), combined_mh.mins.iter()); + let i1: Vec = it1.collect(); + let cmh_mins = combined_mh.mins(); + let it2 = Intersection::new(i1.iter(), cmh_mins.iter()); Ok((it2.count() as u64, combined_mh.mins.len() as u64)) } else { - Ok(intersection_size(self.mins.iter(), other.mins.iter())) + Ok((self.mins.intersection_len(&other.mins), + self.mins.union_len(&other.mins))) } } @@ -652,7 +652,7 @@ impl KmerMinHash { for (i, hash) in self.mins.iter().enumerate() { while let Some((j, k)) = next_hash { - match k.cmp(hash) { + match k.cmp(&hash) { Ordering::Less => next_hash = other_iter.next(), Ordering::Equal => { // Calling `get_unchecked` here is safe since @@ -710,11 +710,11 @@ impl KmerMinHash { } pub fn mins(&self) -> Vec { - self.mins.clone() + self.mins.iter().collect() } - pub fn iter_mins(&self) -> impl Iterator { - self.mins.iter() + pub fn iter_mins(&self) -> impl Iterator + '_ { + (&self.mins).into_iter() } pub fn abunds(&self) -> Option> { @@ -744,13 +744,11 @@ impl KmerMinHash { if let Some(abunds) = &self.abunds { self.mins .iter() - .cloned() - .zip(abunds.iter().cloned()) + .zip(abunds.iter().copied()) .collect() } else { self.mins .iter() - .cloned() .zip(std::iter::repeat(1)) .collect() } @@ -760,7 +758,7 @@ impl KmerMinHash { let mut hll = HyperLogLog::with_error_rate(0.01, self.ksize()).unwrap(); for h in &self.mins { - hll.add_hash(*h) + hll.add_hash(h) } hll @@ -791,7 +789,7 @@ impl KmerMinHash { if self.abunds.is_some() { new_mh.add_many_with_abund(&self.to_vec_abunds())?; } else { - new_mh.add_many(&self.mins)?; + new_mh.add_many(self.mins.iter().collect::>().as_slice())?; } Ok(new_mh) } @@ -811,7 +809,7 @@ impl KmerMinHash { let (mins, abunds): (Vec, Vec) = self_iter .merge_join_by(abunds_from_iter, |&self_val, &(other_val, _)| { - self_val.cmp(other_val) + self_val.cmp(&other_val) }) .filter_map(|either| match either { itertools::EitherOrBoth::Both(self_val, (_other_val, &other_abund)) => { @@ -821,7 +819,7 @@ impl KmerMinHash { }) .unzip(); - self.mins = mins; + self.mins = RoaringTreemap::from_sorted_iter(mins).expect("TODO FIX BEFORE MERGING"); self.abunds = Some(abunds); self.reset_md5sum(); @@ -841,7 +839,7 @@ impl KmerMinHash { let (abundances, total_abundance): (Vec, u64) = self_iter .merge_join_by(abunds_from_iter, |&self_val, &(other_val, _)| { - self_val.cmp(other_val) + self_val.cmp(&other_val) }) .filter_map(|either| match either { itertools::EitherOrBoth::Both(_self_val, (_other_val, other_abund)) => { @@ -860,11 +858,11 @@ impl KmerMinHash { impl SigsTrait for KmerMinHash { fn size(&self) -> usize { - self.mins.len() + self.mins.len() as usize } fn to_vec(&self) -> Vec { - self.mins.clone() + self.mins.iter().collect() } fn ksize(&self) -> usize { @@ -1415,7 +1413,7 @@ impl KmerMinHashBTree { Ok((common, combined_mh.mins.len() as u64)) } else { // Intersection for scaled MinHash sketches - Ok(intersection(self.mins.iter(), other.mins.iter())) + Ok(intersection(self.mins.iter().copied(), other.mins.iter().copied())) } } @@ -1725,8 +1723,8 @@ impl From for KmerMinHashBTree { } fn intersection<'a>( - me_iter: impl Iterator, - other_iter: impl Iterator, + me_iter: impl Iterator, + other_iter: impl Iterator, ) -> (Vec, u64) { let mut me = me_iter.peekable(); let mut other = other_iter.peekable(); @@ -1748,7 +1746,7 @@ fn intersection<'a>( } Ordering::Equal => { other.next(); - common.push(***left_key); + common.push(**left_key); me.next(); union_size += 1; } diff --git a/src/core/src/sketch/nodegraph.rs b/src/core/src/sketch/nodegraph.rs index bbfef5cd0d..aae0c64253 100644 --- a/src/core/src/sketch/nodegraph.rs +++ b/src/core/src/sketch/nodegraph.rs @@ -160,7 +160,7 @@ impl Nodegraph { } pub fn matches(&self, mh: &KmerMinHash) -> usize { - mh.iter_mins().filter(|x| self.get(**x) == 1).count() + mh.iter_mins().filter(|x| self.get(*x) == 1).count() } pub fn ntables(&self) -> usize {