Skip to content

Commit

Permalink
Misc Rust updates to core (#3297)
Browse files Browse the repository at this point in the history
A few useful things from the branchwater plugin perspective:
- add getters to `num`, `scaled`, and `n_hashes` in `Record`
- add misc documentation, based in part on descriptions in #2230
- add `#[derive Clone]` to `Collection` and `CollectionSet`
- add `Collection::from_rocksdb(...)`

---------

Co-authored-by: Luiz Irber <[email protected]>
Co-authored-by: Luiz Irber <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2024
1 parent c621bd5 commit d90c507
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 6 deletions.
68 changes: 68 additions & 0 deletions src/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@ use crate::{Error, Result};
#[cfg(feature = "parallel")]
use rayon::prelude::*;

/// a Manifest and Storage, combined. Can contain any collection of signatures.
#[derive(Clone)]
pub struct Collection {
manifest: Manifest,
storage: InnerStorage,
}

/// A consistent collection of signatures. Can be created using `select`.
#[derive(Clone)]
pub struct CollectionSet {
collection: Collection,
}
Expand Down Expand Up @@ -129,6 +135,17 @@ impl Collection {
})
}

#[cfg(all(feature = "branchwater", not(target_arch = "wasm32")))]
pub fn from_rocksdb<P: AsRef<Path>>(dirname: P) -> Result<Self> {
use crate::index::revindex::{RevIndex, RevIndexOps};

let path = dirname.as_ref().as_str().to_string();
let index = RevIndex::open(path, true, None)?;
let collection: Collection = index.collection().clone().into_inner();

Ok(collection)
}

pub fn from_sigs(sigs: Vec<Signature>) -> Result<Self> {
let storage = MemStorage::new();

Expand Down Expand Up @@ -219,6 +236,7 @@ mod test {
use crate::prelude::Select;
use crate::selection::Selection;
use crate::signature::Signature;
use crate::Result;

Check warning on line 239 in src/core/src/collection.rs

View workflow job for this annotation

GitHub Actions / test (stable)

unused import: `crate::Result`

Check warning on line 239 in src/core/src/collection.rs

View workflow job for this annotation

GitHub Actions / test (beta)

unused import: `crate::Result`

Check warning on line 239 in src/core/src/collection.rs

View workflow job for this annotation

GitHub Actions / test (macos)

unused import: `crate::Result`

Check warning on line 239 in src/core/src/collection.rs

View workflow job for this annotation

GitHub Actions / test (windows)

unused import: `crate::Result`

#[test]
fn sigstore_selection_with_downsample() {
Expand Down Expand Up @@ -416,4 +434,54 @@ mod test {
assert_eq!(this_mh.scaled(), 100);
}
}

#[test]
#[cfg(all(feature = "branchwater", not(target_arch = "wasm32")))]
fn collection_from_rocksdb_storage() -> Result<()> {
use crate::index::revindex::{RevIndex, RevIndexOps};
use camino::Utf8PathBuf as PathBuf;
use tempfile::TempDir;

let basedir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));

let mut zip_collection = basedir.clone();
zip_collection.push("../../tests/test-data/track_abund/track_abund.zip");

let outdir = TempDir::new()?;

let zip_copy = PathBuf::from(
outdir
.path()
.join("sigs.zip")
.into_os_string()
.into_string()
.unwrap(),
);
std::fs::copy(zip_collection, zip_copy.as_path())?;

let selection = Selection::builder().ksize(31).scaled(10000).build();
let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?;
let output: PathBuf = outdir.path().join("index").try_into().unwrap();

// Step 1: create an index
let index = RevIndex::create(output.as_path(), collection.clone().try_into()?, false)?;

// Step 2: internalize the storage for the index
{
let mut index = index;
index
.internalize_storage()
.expect("Error internalizing storage");
}

// Step 3: Create a new collection from rocksdb
let new_collection = Collection::from_rocksdb(output.as_path())?;

// Step 4: assert all content is the same
for (a, b) in collection.iter().zip(new_collection.iter()) {
assert_eq!(a, b);
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion src/core/src/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl From<MashSketcher> for KmerMinHash {

let mut new_mh = KmerMinHash::new(
0,
values.get(0).unwrap().kmer.len() as u32,
values.first().unwrap().kmer.len() as u32,
HashFunctions::Murmur64Dna,
42,
true,
Expand Down
2 changes: 2 additions & 0 deletions src/core/src/index/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::sketch::Sketch;
use crate::storage::SigStore;
use crate::Result;

/// Supports parallel search without a particular index.
pub struct LinearIndex {
collection: CollectionSet,
template: Sketch,
Expand Down
7 changes: 4 additions & 3 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl RevIndexOps for RevIndex {
orig_query: &KmerMinHash,
selection: Option<Selection>,
) -> Result<Vec<GatherResult>> {
let mut match_size = usize::max_value();
let mut match_size = usize::MAX;
let mut matches = vec![];
let mut query = KmerMinHashBTree::from(orig_query.clone());
let mut sum_weighted_found = 0;
Expand Down Expand Up @@ -554,8 +554,9 @@ impl RevIndexOps for RevIndex {
// Using unchecked version because we just used the manifest
// above to make sure the storage is still consistent
unsafe {
Arc::get_mut(&mut self.collection)
.map(|v| v.set_storage_unchecked(InnerStorage::new(new_storage)));
if let Some(v) = Arc::get_mut(&mut self.collection) {
v.set_storage_unchecked(InnerStorage::new(new_storage))
}
}

// write storage spec
Expand Down
2 changes: 1 addition & 1 deletion src/core/src/index/revindex/mem_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl RevIndex {
threshold: usize,
query: &KmerMinHash,
) -> Result<Vec<GatherResult>> {
let mut match_size = usize::max_value();
let mut match_size = usize::MAX;
let mut matches = vec![];

while match_size > threshold && !counter.is_empty() {
Expand Down
10 changes: 10 additions & 0 deletions src/core/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::signature::SigsTrait;
use crate::sketch::Sketch;
use crate::Result;

/// Individual manifest record, containing information about sketches.
#[derive(Debug, Serialize, Deserialize, Clone, CopyGetters, Getters, Setters, PartialEq, Eq)]
pub struct Record {
#[getset(get = "pub", set = "pub")]
Expand All @@ -30,8 +32,13 @@ pub struct Record {

moltype: String,

#[getset(get = "pub")]
num: u32,

#[getset(get = "pub")]
scaled: u64,

#[getset(get = "pub")]
n_hashes: usize,

#[getset(get_copy = "pub", set = "pub")]
Expand Down Expand Up @@ -73,12 +80,15 @@ where
}
}

/// A description of a collection of sketches.
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct Manifest {
records: Vec<Record>,
}

impl Record {
/// Build a Record from a Signature
pub fn from_sig(sig: &Signature, path: &str) -> Vec<Self> {
sig.iter()
.map(|sketch| {
Expand Down
1 change: 1 addition & 0 deletions src/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub struct FSStorage {
subdir: String,
}

/// Store files in a zip file.
#[ouroboros::self_referencing]
pub struct ZipStorage {
mapping: Option<memmap2::Mmap>,
Expand Down
2 changes: 1 addition & 1 deletion src/core/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Storage for RocksDBStorage {
fn save(&self, path: &str, content: &[u8]) -> Result<String> {
let cf_storage = self.db.cf_handle(STORAGE).unwrap();
// TODO(lirber): deal with conflict for path?
self.db.put_cf(&cf_storage, path.as_bytes(), &content[..])?;
self.db.put_cf(&cf_storage, path.as_bytes(), content)?;
Ok(path.into())
}

Expand Down

0 comments on commit d90c507

Please sign in to comment.