Skip to content

Commit

Permalink
bring back update
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Sep 17, 2023
1 parent 25300fa commit 713d7bf
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 65 deletions.
11 changes: 10 additions & 1 deletion src/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::manifest::{Manifest, Record};
use crate::prelude::*;
use crate::signature::Signature;
use crate::storage::{FSStorage, InnerStorage, MemStorage, SigStore, Storage, ZipStorage};
use crate::Result;
use crate::{Error, Result};

#[cfg(feature = "parallel")]
use rayon::prelude::*;
Expand Down Expand Up @@ -95,6 +95,15 @@ impl Collection {
&self.storage
}

pub fn check_superset(&self, other: &Collection) -> Result<usize> {
self.iter()
.zip(other.iter())
.all(|((id1, rec1), (id2, rec2))| id1 == id2 && rec1 == rec2)
.then(|| self.len())
// TODO: right error here
.ok_or(Error::MismatchKSizes)
}

pub fn from_zipfile<P: AsRef<Path>>(zipfile: P) -> Result<Self> {
let storage = ZipStorage::from_file(zipfile)?;
// Load manifest from standard location in zipstorage
Expand Down
68 changes: 16 additions & 52 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -62,7 +62,7 @@ pub fn repair(path: &Path) {
*/

impl RevIndex {
pub fn create(path: &Path, collection: CollectionSet) -> module::RevIndex {
pub fn create(path: &Path, collection: CollectionSet) -> Result<module::RevIndex> {
let mut opts = module::RevIndex::db_options();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
Expand Down Expand Up @@ -95,7 +95,7 @@ impl RevIndex {
index.compact();
info!("Processed {} reference sigs", processed_sigs.into_inner());

module::RevIndex::Plain(index)
Ok(module::RevIndex::Plain(index))
}

pub fn open<P: AsRef<Path>>(path: P, read_only: bool) -> Result<module::RevIndex> {
Expand Down Expand Up @@ -374,60 +374,20 @@ impl RevIndexOps for RevIndex {
Ok(matches)
}

fn update(
&self,
_index_sigs: Vec<PathBuf>,
_template: &Sketch,
_threshold: f64,
_save_paths: bool,
) {
todo!()
/*
use byteorder::ReadBytesExt;
if !save_paths {
todo!("only supports with save_paths=True for now");
}
let cf_sigs = self.db.cf_handle(SIGS).unwrap();
let iter = self.db.iterator_cf(&cf_sigs, rocksdb::IteratorMode::Start);
info!("Verifying existing sigs");
// verify data match up to this point
let mut max_dataset_id = 0;
let to_skip = iter
.map(|result| {
let (key, value) = result.unwrap();
let current_dataset_id = (&key[..]).read_u64::<LittleEndian>().unwrap();
let filename = &index_sigs[current_dataset_id as usize];
let sig_data = SignatureData::from_slice(&value).unwrap();
match sig_data {
SignatureData::External(sig) => {
assert_eq!(sig, filename.as_os_str().to_str().unwrap().to_string())
}
SignatureData::Empty => (),
SignatureData::Internal(_) => {
todo!("only supports with save_paths=True for now")
}
};
max_dataset_id = max_dataset_id.max(current_dataset_id);
})
.count();
max_dataset_id += 1;
assert_eq!(max_dataset_id as usize, to_skip);
fn update(mut self, collection: CollectionSet) -> Result<module::RevIndex> {
// TODO: verify new collection manifest is a superset of current one,
// and the initial chunk is the same
let to_skip = self.collection.check_superset(&collection)?;

// process the remainder
let processed_sigs = AtomicUsize::new(0);

index_sigs
self.collection = Arc::new(collection);

self.collection
.par_iter()
.skip(to_skip)
.enumerate()
.for_each(|(i, filename)| {
let dataset_id = i + to_skip;
.for_each(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
Expand All @@ -436,11 +396,15 @@ impl RevIndexOps for RevIndex {
self.map_hashes_colors(dataset_id as Idx);
});

info!("Compact SSTs");
self.compact();

info!(
"Processed additional {} reference sigs",
processed_sigs.into_inner()
);
*/

Ok(module::RevIndex::Plain(self))
}

fn check(&self, quick: bool) {
Expand Down
28 changes: 17 additions & 11 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod mem_revindex;

use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;

use byteorder::{LittleEndian, WriteBytesExt};
Expand Down Expand Up @@ -56,7 +56,9 @@ pub trait RevIndexOps {
query: &KmerMinHash,
) -> (SigCounter, QueryColors, HashToColor);

fn update(&self, index_sigs: Vec<PathBuf>, template: &Sketch, threshold: f64, save_paths: bool);
fn update(self, collection: CollectionSet) -> Result<RevIndex>
where
Self: Sized;

fn compact(&self);

Expand Down Expand Up @@ -165,7 +167,11 @@ impl RevIndex {
}
*/

pub fn create<P: AsRef<Path>>(index: P, collection: CollectionSet, colors: bool) -> Self {
pub fn create<P: AsRef<Path>>(
index: P,
collection: CollectionSet,
colors: bool,
) -> Result<Self> {
if colors {
todo!() //color_revindex::ColorRevIndex::create(index)
} else {
Expand Down Expand Up @@ -518,7 +524,7 @@ mod test {

let collection =
Collection::from_paths(&siglist)?.select(&Selection::from_template(&template))?;
let index = RevIndex::create(output.path(), collection.try_into()?, false);
let index = RevIndex::create(output.path(), collection.try_into()?, false)?;

let counter = index.counter_for_query(&query);
let matches = index.matches_from_counter(counter, 0);
Expand All @@ -528,7 +534,6 @@ mod test {
Ok(())
}

/*
#[test]
fn revindex_update() -> Result<()> {
let mut basedir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
Expand All @@ -547,8 +552,9 @@ mod test {

let mut new_siglist = siglist.clone();
{
let index = RevIndex::create(output.path(), false);
index.index(siglist, &template, 0., true);
let collection =
Collection::from_paths(&siglist)?.select(&Selection::from_template(&template))?;
RevIndex::create(output.path(), collection.try_into()?, false)?;
}

let mut filename = basedir.clone();
Expand All @@ -564,18 +570,18 @@ mod test {
}
let query = query.unwrap();

let index = RevIndex::open(output.path(), false);
index.update(new_siglist, &template, 0., true);
let new_collection =
Collection::from_paths(&new_siglist)?.select(&Selection::from_template(&template))?;
let index = RevIndex::open(output.path(), false)?.update(new_collection.try_into()?)?;

let counter = index.counter_for_query(&query);
let matches = index.matches_from_counter(counter, 0);

assert!(matches[0].0.ends_with("/genome-s12.fa.gz.sig"));
assert!(matches[0].0.ends_with("/genome-s12.fa.gz"));
assert_eq!(matches[0].1, 45);

Ok(())
}
*/

#[test]
fn revindex_load_and_gather() -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/core/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::signature::{Signature, SigsTrait};
use crate::sketch::Sketch;
use crate::Result;

#[derive(Debug, Serialize, Deserialize, Clone, CopyGetters, Getters, Setters)]
#[derive(Debug, Serialize, Deserialize, Clone, CopyGetters, Getters, Setters, PartialEq)]
pub struct Record {
#[getset(get = "pub", set = "pub")]
internal_location: PathBuf,
Expand Down

0 comments on commit 713d7bf

Please sign in to comment.