diff --git a/Cargo.toml b/Cargo.toml index b2f7a51..ef7b0c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ version = "0.0.1" edition = "2021" [dependencies] -zarrs = { version = "0.12.4", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray", "crc32c" ] } +zarrs = { version = "0.12.4", default-features = false, features = [ "http", "gzip", "sharding", "async", "ndarray", "crc32c" ] } clap = { version = "4.1.8", features = ["derive"] } serde_json = "1.0.108" thiserror = "1.0.50" @@ -17,9 +17,6 @@ rio_api = "0.8.4" rayon = "1.8.0" parking_lot = "0.12" -[target.'cfg(not(target_env = "msvc"))'.dependencies] -jemallocator = "0.5.0" - [profile.release] codegen-units = 1 opt-level = 3 diff --git a/examples/query_bench.rs b/examples/query_bench.rs index f97e448..711effc 100644 --- a/examples/query_bench.rs +++ b/examples/query_bench.rs @@ -6,7 +6,7 @@ use remote_hdt::storage::Storage; use std::env; use std::time::Instant; -const SUBJECT: &str = ""; +const SUBJECT: &str = ""; fn main() -> Result<(), RemoteHDTError> { let args: Vec = env::args().collect(); @@ -21,7 +21,7 @@ fn main() -> Result<(), RemoteHDTError> { let arr = binding.load(Backend::FileSystem(format!("{}.zarr", zarr_path).as_str()))?; let before = Instant::now(); - arr.get_subject(SUBJECT)?; + arr.get_object(SUBJECT)?; println!("Elapsed time: {:.2?}", before.elapsed()); diff --git a/examples/serialize.rs b/examples/serialize.rs index 421c892..9ca4192 100644 --- a/examples/serialize.rs +++ b/examples/serialize.rs @@ -5,10 +5,6 @@ use remote_hdt::storage::Storage; use std::env; use std::time::Instant; -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc; - fn main() -> Result<(), RemoteHDTError> { let args: Vec = env::args().collect(); if args.len() <= 3 { diff --git a/src/engine/chunk.rs b/src/engine/chunk.rs index 9870474..edf515c 100644 --- a/src/engine/chunk.rs +++ b/src/engine/chunk.rs @@ -9,7 +9,7 @@ use crate::utils::rows_per_shard; use super::EngineResult; use super::EngineStrategy; -impl EngineStrategy> for Array { +impl EngineStrategy> for Array { fn get_first_term(&self, index: usize) -> EngineResult> { let shard_index = index as u64 / rows_per_shard(self); let shard = self.retrieve_chunk_elements(&[shard_index, 0])?; diff --git a/src/error.rs b/src/error.rs index 7868acb..a810e88 100644 --- a/src/error.rs +++ b/src/error.rs @@ -52,8 +52,6 @@ pub enum RemoteHDTError { TripleSerialization, #[error("The provided path is not valid")] OsPathToString, - #[error(transparent)] - Opendal(#[from] zarrs::opendal::Error), #[error("The provided backend is read-only")] ReadOnlyBackend, #[error("Error while parsing the RDF graph")] diff --git a/src/storage/layout/mod.rs b/src/storage/layout/mod.rs index 6bb9338..220d673 100644 --- a/src/storage/layout/mod.rs +++ b/src/storage/layout/mod.rs @@ -9,7 +9,8 @@ use zarrs::array::DataType; use zarrs::array::DimensionName; use zarrs::array::FillValue; use zarrs::array_subset::ArraySubset; -use zarrs::storage::store::OpendalStore; +use zarrs::storage::store::FilesystemStore; +use zarrs::storage::ReadableStorageTraits; use crate::dictionary::Dictionary; use crate::error::RemoteHDTError; @@ -30,7 +31,10 @@ pub mod matrix; pub mod tabular; pub trait LayoutOps { - fn retrieve_attributes(&mut self, arr: &Array) -> StorageResult { + fn retrieve_attributes( + &mut self, + arr: &Array, + ) -> StorageResult { // 4. We get the attributes so we can obtain some values that we will need let attributes = arr.attributes(); @@ -63,7 +67,7 @@ pub trait LayoutOps { )) } - fn serialize(&mut self, arr: Array, graph: Graph) -> StorageResult<()> { + fn serialize(&mut self, arr: &Array, graph: Graph) -> StorageResult<()> { let columns = arr.shape()[1] as usize; let count = AtomicU64::new(0); let binding = self.graph_iter(graph.to_owned()); @@ -99,7 +103,7 @@ pub trait LayoutOps { fn parse( &mut self, - arr: &Array, + arr: &Array, dimensionality: &Dimensionality, ) -> StorageResult { // First, we create the 2D matrix in such a manner that the number of diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8035e77..2047967 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -6,11 +6,11 @@ use std::sync::atomic::AtomicU32; use std::sync::Arc; use zarrs::array::Array; use zarrs::array::ArrayBuilder; +use zarrs::array_subset::ArraySubset; use zarrs::group::GroupBuilder; -use zarrs::opendal::services::Fs; -use zarrs::opendal::services::Http; -use zarrs::opendal::Operator; -use zarrs::storage::store::OpendalStore; +use zarrs::storage::store::FilesystemStore; +use zarrs::storage::store::HTTPStore; +use zarrs::storage::ReadableStorageTraits; use crate::dictionary::Dictionary; use crate::error::RemoteHDTError; @@ -41,7 +41,7 @@ pub struct Storage { layout: Box>, serialization: Serialization, reference_system: ReferenceSystem, - array: Option>, + array: Option>, sparse_array: Option, } @@ -78,29 +78,20 @@ impl Storage { reference_system: ReferenceSystem, // threading_strategy: ThreadingStrategy, TODO: implement this ) -> StorageResult<&mut Self> { - let operator = match store { + let path = match store { Backend::FileSystem(path) => { - let mut builder = Fs::default(); let path = PathBuf::from_str(path)?; match path.exists() { true => return Err(RemoteHDTError::PathExists), - false => { - let path = match path.into_os_string().into_string() { - Ok(string) => string, - Err(_) => return Err(RemoteHDTError::OsPathToString), - }; - builder.root(&path); - } + false => path, } - - Operator::new(builder)?.finish() } Backend::HTTP(_) => return Err(RemoteHDTError::ReadOnlyBackend), }; // 2. We can create the FileSystemStore appropiately - let store = Arc::new(OpendalStore::new(operator.blocking())); + let store = Arc::new(FilesystemStore::new(path)?); // Create a group and write metadata to filesystem let group = GroupBuilder::new().build(store.clone(), "/group")?; @@ -144,10 +135,13 @@ impl Storage { attributes.insert("reference_system".into(), reference_system.as_ref().into()); attributes }) - .build(store, ARRAY_NAME)?; + .build(store.clone(), ARRAY_NAME)?; arr.store_metadata()?; - self.layout.serialize(arr, graph)?; + self.layout.serialize(&arr, graph)?; + + let shape = ArraySubset::new_with_ranges(&[0..10, 1..2]); + arr.retrieve_array_subset_elements::(&shape).unwrap(); Ok(self) } @@ -157,32 +151,18 @@ impl Storage { store: Backend<'_>, // threading_strategy: ThreadingStrategy, TODO: implement this ) -> StorageResult<&mut Self> { - let operator = match store { + let store: Arc = match store { Backend::FileSystem(path) => { - let mut builder = Fs::default(); let path = PathBuf::from_str(path)?; match path.exists() { false => return Err(RemoteHDTError::PathDoesNotExist), - true => { - let path = match path.into_os_string().into_string() { - Ok(string) => string, - Err(_) => return Err(RemoteHDTError::OsPathToString), - }; - builder.root(&path); - } + true => Arc::new(FilesystemStore::new(path)?), } - - Operator::new(builder)?.finish() - } - Backend::HTTP(path) => { - let mut builder = Http::default(); - builder.endpoint(path); - Operator::new(builder)?.finish() } + Backend::HTTP(url) => Arc::new(HTTPStore::new(url)?), }; - let store: Arc = Arc::new(OpendalStore::new(operator.blocking())); let arr = Array::new(store, ARRAY_NAME)?; let dictionary = self.layout.retrieve_attributes(&arr)?; self.dictionary = dictionary; diff --git a/src/storage/ops.rs b/src/storage/ops.rs index b2fb565..afb7de2 100644 --- a/src/storage/ops.rs +++ b/src/storage/ops.rs @@ -39,7 +39,9 @@ impl Ops for Storage { Some(array) => OpsFormat::SparseArray(match self.reference_system { ReferenceSystem::SPO | ReferenceSystem::SOP => array.get_first_term(index)?, ReferenceSystem::PSO | ReferenceSystem::OSP => array.get_second_term(index)?, - ReferenceSystem::POS | ReferenceSystem::OPS => array.get_third_term(index)?, + ReferenceSystem::POS | ReferenceSystem::OPS => { + array.get_third_term(index).unwrap() + } }), None => return Err(OpsError::EmptySparseArray), }, diff --git a/src/utils.rs b/src/utils.rs index eaf6396..1c498a1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -31,7 +31,7 @@ pub fn hash_to_set(terms: HashSet) -> Vec { vec } -pub fn rows_per_shard(arr: &Array) -> u64 { +pub fn rows_per_shard(arr: &Array) -> u64 { match arr.chunk_grid().chunk_shape(&[0, 0], arr.shape()) { Ok(shape) => match shape { Some(chunk_shape) => chunk_shape[0].into(), @@ -41,7 +41,7 @@ pub fn rows_per_shard(arr: &Array) -> u64 { } } -pub fn columns_per_shard(arr: &Array) -> u64 { +pub fn columns_per_shard(arr: &Array) -> u64 { match arr.chunk_grid().chunk_shape(&[0, 0], arr.shape()) { Ok(shape) => match shape { Some(chunk_shape) => chunk_shape[1].into(), diff --git a/tests/get_object_test.rs b/tests/get_object_test.rs index a6d0882..984ab83 100644 --- a/tests/get_object_test.rs +++ b/tests/get_object_test.rs @@ -20,7 +20,7 @@ fn get_object_matrix_sharding_test() -> Result<(), Box> { common::setup( common::SHARDING_ZARR, &mut storage, - ChunkingStrategy::Sharding(4), + ChunkingStrategy::Sharding(3), ReferenceSystem::SPO, ); diff --git a/tests/orientation.rs b/tests/orientation.rs index 5228fb4..7b56d25 100644 --- a/tests/orientation.rs +++ b/tests/orientation.rs @@ -8,6 +8,7 @@ use remote_hdt::storage::params::ChunkingStrategy; use remote_hdt::storage::params::ReferenceSystem; use remote_hdt::storage::params::Serialization; use remote_hdt::storage::Storage; +use sprs::TriMat; use std::error::Error; mod common; @@ -116,11 +117,31 @@ fn orientation_pso_tabular_test() -> Result<(), Box> { .load(Backend::FileSystem(common::TABULAR_PSO_ZARR))? .get_predicate(common::Predicate::InstanceOf.into())? { - OpsFormat::Zarr(actual) => actual, + OpsFormat::SparseArray(actual) => actual, _ => unreachable!(), }; - if actual == vec![3, 1, 1] { + let mut expected = TriMat::new(( + storage.get_dictionary().predicates_size(), + storage.get_dictionary().objects_size(), + )); + expected.add_triplet( + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + common::Object::Human.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + common::Object::Town.get_idx(&storage.get_dictionary()), + common::Subject::Wilmslow.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + common::Object::Computer.get_idx(&storage.get_dictionary()), + common::Subject::Bombe.get_idx(&storage.get_dictionary()), + ); + + if actual == expected.to_csc() { Ok(()) } else { println!("{:?}", actual); @@ -130,7 +151,7 @@ fn orientation_pso_tabular_test() -> Result<(), Box> { #[test] fn orientation_ops_tabular_test() -> Result<(), Box> { - let mut storage = Storage::new(TabularLayout, Serialization::Zarr); + let mut storage = Storage::new(TabularLayout, Serialization::Sparse); common::setup( common::TABULAR_OPS_ZARR, @@ -143,11 +164,41 @@ fn orientation_ops_tabular_test() -> Result<(), Box> { .load(Backend::FileSystem(common::TABULAR_OPS_ZARR))? .get_subject(common::Subject::Alan.into())? { - OpsFormat::Zarr(actual) => actual, + OpsFormat::SparseArray(actual) => actual, _ => unreachable!(), }; - if actual == vec![1, 3, 4, 0, 0, 0, 0, 6, 7] { + let mut expected = TriMat::new(( + storage.get_dictionary().objects_size(), + storage.get_dictionary().subjects_size(), + )); + expected.add_triplet( + common::Object::Human.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Object::Warrington.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::PlaceOfBirth.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Object::Wilmslow.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::PlaceOfDeath.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Object::Date.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::DateOfBirth.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Object::GCHQ.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::Employer.get_idx(&storage.get_dictionary()), + ); + + if actual == expected.to_csc() { Ok(()) } else { Err(String::from("Expected and actual results are not equals").into())