Skip to content

Commit

Permalink
make storage and manifest private in collection
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Sep 17, 2023
1 parent 84ce2bf commit e442b17
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 104 deletions.
39 changes: 37 additions & 2 deletions src/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use crate::signature::Signature;
use crate::storage::{FSStorage, InnerStorage, MemStorage, SigStore, Storage, ZipStorage};
use crate::Result;

#[cfg(feature = "parallel")]
use rayon::prelude::*;

pub struct Collection {
pub(crate) manifest: Manifest,
pub(crate) storage: InnerStorage,
manifest: Manifest,
storage: InnerStorage,
}

pub struct CollectionSet {
Expand Down Expand Up @@ -60,6 +63,34 @@ impl CollectionSet {
}

impl Collection {
pub fn new(manifest: Manifest, storage: InnerStorage) -> Self {
Self { manifest, storage }
}

pub fn iter(&self) -> impl Iterator<Item = (Idx, &Record)> {
self.manifest.iter().enumerate().map(|(i, r)| (i as Idx, r))
}

#[cfg(feature = "parallel")]
pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = (Idx, &Record)> {
self.manifest
.par_iter()
.enumerate()
.map(|(i, r)| (i as Idx, r))
}

pub fn len(&self) -> usize {
self.manifest.len()
}

pub fn manifest(&self) -> &Manifest {
&self.manifest
}

pub fn storage(&self) -> &InnerStorage {
&self.storage
}

pub fn from_zipfile<P: AsRef<Path>>(zipfile: P) -> Result<Self> {
let storage = ZipStorage::from_file(zipfile)?;
// Load manifest from standard location in zipstorage
Expand Down Expand Up @@ -119,6 +150,10 @@ impl Collection {
})
}

pub fn record_for_dataset(&self, dataset_id: Idx) -> Result<&Record> {
Ok(&self.manifest[dataset_id as usize])
}

pub fn sig_for_dataset(&self, dataset_id: Idx) -> Result<SigStore> {
let match_path = if self.manifest.is_empty() {
""
Expand Down
85 changes: 24 additions & 61 deletions src/core/src/index/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ use rayon::prelude::*;
use crate::collection::CollectionSet;
use crate::encodings::Idx;
use crate::index::{GatherResult, Index, Selection, SigCounter};
use crate::manifest::Manifest;
use crate::selection::Select;
use crate::signature::{Signature, SigsTrait};
use crate::sketch::minhash::KmerMinHash;
use crate::sketch::Sketch;
use crate::storage::{InnerStorage, SigStore, Storage};
use crate::storage::SigStore;
use crate::Result;

pub struct LinearIndex {
Expand Down Expand Up @@ -46,53 +45,32 @@ impl LinearIndex {
}

pub fn location(&self) -> Option<String> {
if let Some(_storage) = &self.storage() {
// storage.path()
unimplemented!()
} else {
None
}
}

pub fn storage(&self) -> Option<InnerStorage> {
Some(self.collection.storage.clone())
unimplemented!()
}

pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter {
let processed_sigs = AtomicUsize::new(0);

let search_sigs: Vec<_> = self
.collection
.manifest
.internal_locations()
.map(PathBuf::from)
.collect();

let template = self.template();

#[cfg(feature = "parallel")]
let sig_iter = search_sigs.par_iter();
let sig_iter = self.collection.par_iter();

#[cfg(not(feature = "parallel"))]
let sig_iter = search_sigs.iter();
let sig_iter = self.collection.iter();

let counters = sig_iter.filter_map(|(dataset_id, record)| {
let filename = record.internal_location();

let counters = sig_iter.enumerate().filter_map(|(dataset_id, filename)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

let search_sig = if let Some(storage) = &self.storage() {
let sig_data = storage
.load(filename.as_str())
.unwrap_or_else(|_| panic!("error loading {:?}", filename));

Signature::from_reader(sig_data.as_slice())
} else {
Signature::from_path(filename)
}
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.swap_remove(0);
let search_sig = self
.collection
.sig_for_dataset(dataset_id)
.unwrap_or_else(|_| panic!("error loading {:?}", filename));

let mut search_mh = None;
if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) {
Expand Down Expand Up @@ -147,7 +125,8 @@ impl LinearIndex {
for (dataset_id, size) in counter.most_common() {
if size >= threshold {
matches.push(
self.collection.manifest[dataset_id as usize]
self.collection
.record_for_dataset(dataset_id)?
.internal_location()
.to_string(),
);
Expand All @@ -165,14 +144,11 @@ impl LinearIndex {
query: &KmerMinHash,
round: usize,
) -> Result<GatherResult> {
let match_path = if self.collection.manifest.is_empty() {
""
} else {
self.collection.manifest[dataset_id as usize]
.internal_location()
.as_str()
}
.into();
let match_path = self
.collection
.record_for_dataset(dataset_id)?
.internal_location()
.into();
let match_sig = self.collection.sig_for_dataset(dataset_id)?;
let result = self.stats_for_match(&match_sig, query, match_size, match_path, round)?;
Ok(result)
Expand Down Expand Up @@ -289,18 +265,8 @@ impl LinearIndex {
Ok(matches)
}

pub fn manifest(&self) -> Manifest {
self.collection.manifest.clone()
}

pub fn set_manifest(&mut self, new_manifest: Manifest) -> Result<()> {
self.collection.manifest = new_manifest;
Ok(())
}

pub fn signatures_iter(&self) -> impl Iterator<Item = SigStore> + '_ {
// FIXME temp solution, must find better one!
(0..self.collection.manifest.len()).map(move |dataset_id| {
(0..self.collection.len()).map(move |dataset_id| {
self.collection
.sig_for_dataset(dataset_id as Idx)
.expect("error loading sig")
Expand Down Expand Up @@ -339,19 +305,16 @@ impl<'a> Index<'a> for LinearIndex {
}

fn len(&self) -> usize {
self.collection.manifest.len()
self.collection.len()
}

fn signatures(&self) -> Vec<Self::Item> {
self.collection()
.manifest
.internal_locations()
.map(PathBuf::from)
.map(|p| {
.iter()
.map(|(i, p)| {
self.collection()
.storage
.load_sig(p.as_str())
.unwrap_or_else(|_| panic!("Error processing {:?}", p))
.sig_for_dataset(i as Idx)
.unwrap_or_else(|_| panic!("Error processing {}", p.internal_location()))
})
.collect()
}
Expand Down
30 changes: 14 additions & 16 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,14 @@ impl RevIndex {
collection: Arc::new(collection),
};

index
.collection
.manifest
.par_iter()
.enumerate()
.for_each(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}
index.collection.par_iter().for_each(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

index.map_hashes_colors(dataset_id as Idx);
});
index.map_hashes_colors(dataset_id as Idx);
});

index.save_collection().expect("Error saving collection");

Expand Down Expand Up @@ -143,7 +138,7 @@ impl RevIndex {
InnerStorage::from_spec(spec)?
};

Collection { manifest, storage }.try_into()
Collection::new(manifest, storage).try_into()
}

fn save_collection(&self) -> Result<()> {
Expand All @@ -152,12 +147,12 @@ impl RevIndex {
// write manifest
let mut wtr = vec![];
{
self.collection.manifest.to_writer(&mut wtr)?;
self.collection.manifest().to_writer(&mut wtr)?;
}
self.db.put_cf(&cf_metadata, MANIFEST, &wtr[..])?;

// write storage spec
let spec = self.collection.storage.spec();
let spec = self.collection.storage().spec();

// TODO: check if spec if memstorage, would probably have to
// save into rocksdb in that case!
Expand Down Expand Up @@ -269,7 +264,10 @@ impl RevIndexOps for RevIndex {
.into_iter()
.filter_map(|(dataset_id, size)| {
if size >= threshold {
let row = &self.collection.manifest[dataset_id as usize];
let row = &self
.collection
.record_for_dataset(dataset_id)
.expect("dataset not found");
Some((row.name().into(), size))
} else {
None
Expand Down
36 changes: 13 additions & 23 deletions src/core/src/index/revindex/mem_revindex.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};

use camino::Utf8Path as Path;
use camino::Utf8PathBuf as PathBuf;
use log::{debug, info};
use nohash_hasher::BuildNoHashHasher;
use serde::{Deserialize, Serialize};

#[cfg(feature = "parallel")]
use rayon::prelude::*;

use crate::collection::Collection;
use crate::encodings::{Color, Colors, Idx};
use crate::encodings::{Colors, Idx};
use crate::index::linear::LinearIndex;
use crate::index::revindex::HashToColor;
use crate::index::{GatherResult, Index, SigCounter};
use crate::prelude::*;
use crate::signature::{Signature, SigsTrait};
use crate::sketch::minhash::KmerMinHash;
use crate::sketch::Sketch;
use crate::storage::Storage;
use crate::HashIntoType;
use crate::Result;

pub struct RevIndex {
Expand All @@ -38,34 +33,26 @@ impl LinearIndex {
) -> RevIndex {
let processed_sigs = AtomicUsize::new(0);

let search_sigs: Vec<_> = self
.collection()
.manifest
.internal_locations()
.map(PathBuf::from)
.collect();

#[cfg(feature = "parallel")]
let sig_iter = search_sigs.par_iter();
let sig_iter = self.collection().par_iter();

#[cfg(not(feature = "parallel"))]
let sig_iter = search_sigs.iter();
let sig_iter = self.collection().iter();

let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| {
let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

let search_sig = self
.collection()
.storage
.load_sig(filename.as_str())
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.sig_for_dataset(dataset_id as Idx)
.expect("Error loading sig")
.into();

RevIndex::map_hashes_colors(
dataset_id,
dataset_id as Idx,
&search_sig,
queries,
&merged_query,
Expand Down Expand Up @@ -160,7 +147,7 @@ impl RevIndex {
}

fn map_hashes_colors(
dataset_id: usize,
dataset_id: Idx,
search_sig: &Signature,
queries: Option<&[KmerMinHash]>,
merged_query: &Option<KmerMinHash>,
Expand Down Expand Up @@ -275,8 +262,11 @@ impl RevIndex {
let match_size = if size >= threshold { size } else { break };

let match_sig = self.linear.sig_for_dataset(dataset_id)?;
let match_path =
self.linear.collection().manifest[dataset_id as usize].internal_location();
let match_path = self
.linear
.collection()
.record_for_dataset(dataset_id)?
.internal_location();

let mut match_mh = None;
if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(self.linear.template()) {
Expand Down
4 changes: 2 additions & 2 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ impl HashToColor {
self.0.is_empty()
}

fn add_to(&mut self, colors: &mut Colors, dataset_id: usize, matched_hashes: Vec<u64>) {
fn add_to(&mut self, colors: &mut Colors, dataset_id: Idx, matched_hashes: Vec<u64>) {
let mut color = None;

matched_hashes.into_iter().for_each(|hash| {
color = Some(colors.update(color, &[dataset_id as Idx]).unwrap());
color = Some(colors.update(color, &[dataset_id]).unwrap());
self.0.insert(hash, color.unwrap());
});
}
Expand Down

0 comments on commit e442b17

Please sign in to comment.