From 36815851c3f825911cbc1d32f361bd48babfee04 Mon Sep 17 00:00:00 2001 From: angelip2303 Date: Thu, 7 Mar 2024 11:22:06 +0000 Subject: [PATCH] get_predicate implemented --- .gitignore | 3 +- Cargo.toml | 2 +- examples/{serialize_bench.rs => serialize.rs} | 2 +- src/engine/array.rs | 10 ++- src/engine/chunk.rs | 30 +++++-- src/storage/layout/matrix.rs | 17 ++-- src/storage/layout/mod.rs | 8 +- src/storage/layout/tabular.rs | 6 +- src/storage/mod.rs | 7 +- src/storage/params.rs | 10 +-- tests/common/mod.rs | 6 +- tests/get_predicate_test.rs | 88 +++++++++++++++++++ tests/orientation.rs | 47 ++-------- 13 files changed, 161 insertions(+), 75 deletions(-) rename examples/{serialize_bench.rs => serialize.rs} (90%) create mode 100644 tests/get_predicate_test.rs diff --git a/.gitignore b/.gitignore index 562046a..7f51346 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ benches/*/*.nt !resources/root.zarr .vscode heaptrack.* -tests/out \ No newline at end of file +tests/out +uniprotkb_* \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index ac39d2e..95475c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ version = "0.0.1" edition = "2021" [dependencies] -zarrs = { version = "0.12.2", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray" ] } +zarrs = { version = "0.12.3", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray" ] } clap = { version = "4.1.8", features = ["derive"] } serde_json = "1.0.108" thiserror = "1.0.50" diff --git a/examples/serialize_bench.rs b/examples/serialize.rs similarity index 90% rename from examples/serialize_bench.rs rename to examples/serialize.rs index 2e15eb6..421c892 100644 --- a/examples/serialize_bench.rs +++ b/examples/serialize.rs @@ -12,7 +12,7 @@ static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc; fn main() -> Result<(), RemoteHDTError> { let args: Vec = env::args().collect(); if args.len() <= 3 { - panic!("Usage: cargo run --example serialize_bench "); + panic!("Usage: cargo run --example serialize "); } let rdf_path = &args[1].as_str(); diff --git a/src/engine/array.rs b/src/engine/array.rs index 3a97292..0470603 100644 --- a/src/engine/array.rs +++ b/src/engine/array.rs @@ -13,8 +13,14 @@ impl EngineStrategy for ZarrArray { Ok(&matrix * self) } - fn get_second_term(&self, _value: usize) -> EngineResult { - unimplemented!() + fn get_second_term(&self, value: usize) -> EngineResult { + let mut matrix = TriMat::new((self.rows(), self.cols())); + self.iter().for_each(|(&e, (row, col))| { + if e == value { + matrix.add_triplet(row, col, value); + } + }); + Ok(matrix.to_csc()) } fn get_third_term(&self, index: usize) -> EngineResult { diff --git a/src/engine/chunk.rs b/src/engine/chunk.rs index d92901e..0f37f6c 100644 --- a/src/engine/chunk.rs +++ b/src/engine/chunk.rs @@ -2,6 +2,7 @@ use zarrs::array::Array; use zarrs::array_subset::ArraySubset; use zarrs::storage::ReadableStorageTraits; +use crate::error::EngineError; use crate::utils::columns_per_shard; use crate::utils::rows_per_shard; @@ -19,14 +20,33 @@ impl EngineStrategy> for Array { Ok(chunk.to_vec()) } - fn get_second_term(&self, _index: usize) -> EngineResult> { - unimplemented!() + fn get_second_term(&self, index: usize) -> EngineResult> { + let mut ans = Vec::new(); + let number_of_shards = match self.chunk_grid_shape() { + Some(chunk_grid) => chunk_grid[0], + None => return Err(EngineError::Operation), + }; + for i in 0..number_of_shards { + let mut shard = self.retrieve_chunk_elements::(&[i, 0])?; + shard.iter_mut().for_each(|e| { + if *e != index as u32 { + *e = 0 + } + }); + ans.append(&mut shard); + } + Ok(ans) } fn get_third_term(&self, index: usize) -> EngineResult> { + let objects = self.shape()[0]; let col = index as u64; - let shape = ArraySubset::new_with_start_end_inc(vec![0, col], vec![self.shape()[0], col])?; - let ans = self.retrieve_array_subset_elements(&shape)?; - Ok(ans) + let shape = ArraySubset::new_with_ranges(&[0..objects, col..col + 1]); + let array_subset = self.retrieve_array_subset(&shape).unwrap(); + let third_term_subset = array_subset + .windows(4) + .map(|w| u32::from_ne_bytes(w.try_into().unwrap())) + .collect::>(); + Ok(third_term_subset) } } diff --git a/src/storage/layout/matrix.rs b/src/storage/layout/matrix.rs index 69f7fd2..f2bc735 100644 --- a/src/storage/layout/matrix.rs +++ b/src/storage/layout/matrix.rs @@ -85,8 +85,11 @@ impl Layout for MatrixLayout { &self, dimensionality: &Dimensionality, ) -> StorageResult> { - let mut sharding_codec_builder = - ShardingCodecBuilder::new(vec![1, dimensionality.get_third_term_size()].try_into()?); + let mut sharding_codec_builder = ShardingCodecBuilder::new( + vec![1, dimensionality.get_third_term_size()] + .as_slice() + .try_into()?, + ); sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); Ok(Box::new(sharding_codec_builder.build())) } @@ -124,16 +127,18 @@ impl LayoutOps for MatrixLayout { &mut self, matrix: &Mutex>, first_term_index: usize, - chunk: &[usize], + chunk: &[u32], ) { chunk .iter() .enumerate() .for_each(|(third_term_idx, &second_term_idx)| { if second_term_idx != 0 { - matrix - .lock() - .add_triplet(first_term_index, third_term_idx, second_term_idx); + matrix.lock().add_triplet( + first_term_index, + third_term_idx, + second_term_idx as usize, + ); } }) } diff --git a/src/storage/layout/mod.rs b/src/storage/layout/mod.rs index a50e2a8..11657ad 100644 --- a/src/storage/layout/mod.rs +++ b/src/storage/layout/mod.rs @@ -72,12 +72,12 @@ pub trait LayoutOps { for chunk in iter { let slice = self.store_chunk_elements(chunk, columns); - arr.store_chunk_elements(&[count.load(Ordering::Relaxed), 0], slice)?; + arr.store_chunk_elements::(&[count.load(Ordering::Relaxed), 0], slice)?; count.fetch_add(1, Ordering::Relaxed); } if !remainder.is_empty() { - arr.store_array_subset_elements( + arr.store_array_subset_elements::( &ArraySubset::new_with_start_shape( vec![count.load(Ordering::Relaxed) * rows_per_shard(&arr), 0], vec![remainder.len() as u64, columns_per_shard(&arr)], @@ -123,7 +123,7 @@ pub trait LayoutOps { // of it. Once we have all the pieces processed, we will have parsed the // whole array for shard in 0..number_of_shards { - arr.retrieve_chunk_elements(&[shard, 0])? + arr.retrieve_chunk_elements::(&[shard, 0])? // We divide each shard by the number of columns, as a shard is // composed of chunks having the size of [1, number of cols] .chunks(number_of_columns) @@ -150,7 +150,7 @@ pub trait LayoutOps { &mut self, matrix: &Mutex>, first_term_idx: usize, - chunk: &[usize], + chunk: &[u32], ); fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize; } diff --git a/src/storage/layout/tabular.rs b/src/storage/layout/tabular.rs index d0e8115..be6c520 100644 --- a/src/storage/layout/tabular.rs +++ b/src/storage/layout/tabular.rs @@ -84,12 +84,12 @@ impl LayoutOps for TabularLayout { fn retrieve_chunk_elements( &mut self, matrix: &Mutex>, - first_term_index: usize, // TODO: will first_term_index instead of chunk[0] do the trick? - chunk: &[usize], + _first_term_index: usize, // TODO: will first_term_index instead of chunk[0] do the trick? + chunk: &[u32], ) { matrix .lock() - .add_triplet(chunk[0], chunk[2], chunk[1] as usize); + .add_triplet(chunk[0] as usize, chunk[2] as usize, chunk[1] as usize); } fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 75acf75..8035e77 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -104,8 +104,7 @@ impl Storage { // Create a group and write metadata to filesystem let group = GroupBuilder::new().build(store.clone(), "/group")?; - - let _ = group.store_metadata()?; + group.store_metadata()?; // TODO: rayon::ThreadPoolBuilder::new() // .num_threads(1) @@ -153,9 +152,9 @@ impl Storage { Ok(self) } - pub fn load<'a>( + pub fn load( &mut self, - store: Backend<'a>, + store: Backend<'_>, // threading_strategy: ThreadingStrategy, TODO: implement this ) -> StorageResult<&mut Self> { let operator = match store { diff --git a/src/storage/params.rs b/src/storage/params.rs index d453a66..a21559b 100644 --- a/src/storage/params.rs +++ b/src/storage/params.rs @@ -38,7 +38,7 @@ pub enum ReferenceSystem { pub struct Dimensionality { graph_size: Option, pub(crate) first_term_size: usize, - second_term_size: usize, + _second_term_size: usize, pub(crate) third_term_size: usize, } @@ -91,7 +91,7 @@ impl Dimensionality { ReferenceSystem::POS | ReferenceSystem::PSO => dictionary.predicates_size(), ReferenceSystem::OPS | ReferenceSystem::OSP => dictionary.objects_size(), }, - second_term_size: match dictionary.get_reference_system() { + _second_term_size: match dictionary.get_reference_system() { ReferenceSystem::PSO | ReferenceSystem::OSP => dictionary.subjects_size(), ReferenceSystem::SPO | ReferenceSystem::OPS => dictionary.predicates_size(), ReferenceSystem::SOP | ReferenceSystem::POS => dictionary.objects_size(), @@ -112,9 +112,9 @@ impl Dimensionality { self.first_term_size as u64 } - pub(crate) fn get_second_term_size(&self) -> u64 { - self.second_term_size as u64 - } + // pub(crate) fn get_second_term_size(&self) -> u64 { + // self._second_term_size as u64 + // } pub(crate) fn get_third_term_size(&self) -> u64 { self.third_term_size as u64 diff --git a/tests/common/mod.rs b/tests/common/mod.rs index daf67bd..c0d5b80 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -46,7 +46,7 @@ pub enum Subject { } impl Subject { - fn get_idx(self, dictionary: &Dictionary) -> usize { + pub(crate) fn get_idx(self, dictionary: &Dictionary) -> usize { dictionary.get_subject_idx_unchecked(self.into()) } } @@ -74,7 +74,7 @@ pub enum Predicate { } impl Predicate { - fn get_idx(self, dictionary: &Dictionary) -> usize { + pub(crate) fn get_idx(self, dictionary: &Dictionary) -> usize { dictionary.get_predicate_idx_unchecked(self.into()) } } @@ -107,7 +107,7 @@ pub enum Object { } impl Object { - fn get_idx(self, dictionary: &Dictionary) -> usize { + pub(crate) fn get_idx(self, dictionary: &Dictionary) -> usize { dictionary.get_object_idx_unchecked(self.into()) } } diff --git a/tests/get_predicate_test.rs b/tests/get_predicate_test.rs new file mode 100644 index 0000000..6352252 --- /dev/null +++ b/tests/get_predicate_test.rs @@ -0,0 +1,88 @@ +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::ops::Ops; +use remote_hdt::storage::ops::OpsFormat; +use remote_hdt::storage::params::Backend; +use remote_hdt::storage::params::ChunkingStrategy; +use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Serialization; +use remote_hdt::storage::Storage; +use sprs::TriMat; +use std::error::Error; + +mod common; + +#[test] +fn get_predicate_matrix_chunk_test() -> Result<(), Box> { + let mut storage = Storage::new(MatrixLayout, Serialization::Zarr); + + common::setup( + common::MATRIX_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); + + let actual = match storage + .load(Backend::FileSystem(common::MATRIX_ZARR))? + .get_predicate(common::Predicate::InstanceOf.into())? + { + OpsFormat::Zarr(actual) => actual, + _ => unreachable!(), + }; + + if actual + == vec![ + 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 5, 0, 0, 0, + ] + { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } +} + +#[test] +fn get_predicate_tabular_test() -> Result<(), Box> { + let mut storage = Storage::new(TabularLayout, Serialization::Sparse); + + common::setup( + common::TABULAR_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); + + let actual = match storage + .load(Backend::FileSystem(common::TABULAR_ZARR))? + .get_predicate(common::Predicate::InstanceOf.into())? + { + OpsFormat::SparseArray(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = TriMat::new((4, 9)); + expected.add_triplet( + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Object::Human.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Subject::Wilmslow.get_idx(&storage.get_dictionary()), + common::Object::Town.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Subject::Bombe.get_idx(&storage.get_dictionary()), + common::Object::Computer.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + let expected = expected.to_csc(); + + if actual == expected { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } +} diff --git a/tests/orientation.rs b/tests/orientation.rs index 71c05df..cf6a737 100644 --- a/tests/orientation.rs +++ b/tests/orientation.rs @@ -79,49 +79,16 @@ fn orientation_pso_tabular_test() -> Result<(), Box> { .load(Backend::FileSystem(common::TABULAR_PSO_ZARR))? .get_predicate(common::Predicate::InstanceOf.into())? { - OpsFormat::SparseArray(actual) => actual, + OpsFormat::Zarr(actual) => actual, _ => unreachable!(), }; - println!("{}", storage.get_sparse_array().unwrap().to_dense()); - - storage - .get_dictionary() - .subjects() - .iter() - .for_each(|(i, e)| println!("{} {}", i, std::str::from_utf8(&e).unwrap().to_string())); - - println!(); - - storage - .get_dictionary() - .predicates() - .iter() - .for_each(|(i, e)| println!("{} {}", i, std::str::from_utf8(&e).unwrap().to_string())); - - println!(); - - storage - .get_dictionary() - .objects() - .iter() - .for_each(|(i, e)| println!("{} {}", i, std::str::from_utf8(&e).unwrap().to_string())); - - println!( - "{:?}", - storage - .get_dictionary() - .get_subject_idx(common::Subject::Warrington.into()) - ); - - Ok(()) - - // if actual == vec![3, 1, 1] { - // Ok(()) - // } else { - // println!("{:?}", actual); - // Err(String::from("Expected and actual results are not equals").into()) - // } + if actual == vec![3, 1, 1] { + Ok(()) + } else { + println!("{:?}", actual); + Err(String::from("Expected and actual results are not equals").into()) + } } #[test]