diff --git a/examples/complement_bench.rs b/examples/complement_bench.rs index 0ef8b2f..65199f0 100644 --- a/examples/complement_bench.rs +++ b/examples/complement_bench.rs @@ -2,8 +2,18 @@ use remote_hdt::error::RemoteHDTError; use remote_hdt::complement::layout::default::DefaultComplementLayout; use remote_hdt::complement::ops::Ops; use remote_hdt::complement::ComplementStorage; +use remote_hdt::storage::params::Backend; fn main() { + + let args: Vec = env::args().collect(); + if args.len() <= 1 { + panic!("Usage: cargo run --example query_bench "); + } + + let number_of_universities: &String = &args[1]; + let zarr_path = format!("{}-lubm", number_of_universities); let mut binding = ComplementStorage::new(DefaultComplementLayout); - + let arr = binding.load(Backend::FileSystem(format!("{}.zarr", zarr_path).as_str()))?; + } diff --git a/src/complement/layout/default.rs b/src/complement/layout/default.rs index 6ac95e4..4480cc2 100644 --- a/src/complement/layout/default.rs +++ b/src/complement/layout/default.rs @@ -1 +1,61 @@ -pub struct DefaultComplementLayout; \ No newline at end of file +use super::ComplementLayout; + + +pub struct DefaultComplementLayout; +use crate::complement::Dimensionality; +use crate::complement::layout::DataType; +use crate::complement::ChunkingStrategy; +use crate::complement::layout::ComplementStorageResult; +use crate::complement::layout::ComplementLayoutOps; + +use std::num::NonZeroU64; + +use zarrs::array::ChunkGrid; +use zarrs::array::FillValue; +use zarrs::array::DimensionName; +use zarrs::array::codec::ArrayToBytesCodecTraits; +use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder; +use zarrs::array::codec::GzipCodec; + + +impl ComplementLayout for DefaultComplementLayout{ + fn shape(&self, dimensionality: &Dimensionality) -> Vec { + vec![ + dimensionality.get_graph_size(), + dimensionality.get_nodes_size(), + ] + } + + fn data_type(&self) -> DataType { + DataType::UInt64 + } + + fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid { + vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value + } + + fn fill_value(&self) -> FillValue { + FillValue::from(0u64) + } + + fn dimension_names(&self) -> Option> { + Some(vec![ + DimensionName::new("Triples"), + DimensionName::new("Fields"), + ]) + } + + //TODO + fn array_to_bytes_codec( + &self, + _: &Dimensionality, + ) -> ComplementStorageResult> { + let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![1, 3].try_into()?); + sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); + Ok(Box::new(sharding_codec_builder.build())) + } +} + +impl ComplementLayoutOps for DefaultComplementLayout { + +} \ No newline at end of file diff --git a/src/complement/layout/mod.rs b/src/complement/layout/mod.rs index eced7a7..46d01b9 100644 --- a/src/complement/layout/mod.rs +++ b/src/complement/layout/mod.rs @@ -2,41 +2,33 @@ pub mod default; use zarrs::storage::store::OpendalStore; use zarrs::array::Array; -pub trait ComplementLayout { - fn retrieve_attributes(&mut self, arr: &Array) { - // 4. We get the attributes so we can obtain some values that we will need - let attributes = arr.attributes(); - - let subjects = &value_to_term(match attributes.get("subjects") { - Some(subjects) => subjects, - None => return Err(RemoteHDTError::SubjectsNotInJSON), - }); - let predicates = &value_to_term(match attributes.get("predicates") { - Some(predicates) => predicates, - None => return Err(RemoteHDTError::PredicatesNotInJSON), - }); - let objects = &value_to_term(match attributes.get("objects") { - Some(objects) => objects, - None => return Err(RemoteHDTError::ObjectsNotInJSON), - }); - - let reference_system: ReferenceSystem = match attributes.get("reference_system") { - Some(reference_system) => reference_system, - None => return Err(RemoteHDTError::ReferenceSystemNotInJSON), - } - .as_str() - .unwrap() - .into(); - - Ok(Dictionary::from_vec_str( - reference_system, - subjects, - predicates, - objects, - )) - } +use zarrs::array::DataType; +use zarrs::array::ChunkGrid; +use zarrs::array::FillValue; +use zarrs::array::DimensionName; +use zarrs::array::codec::ArrayToBytesCodecTraits; +use zarrs::array_subset::ArraySubset; + +use std::sync::atomic::AtomicU64; + +use std::sync::atomic::Ordering; + +use crate::complement::Dimensionality; +use crate::complement::params::ChunkingStrategy; +use crate::complement::RemoteHDTError; +use crate::complement::Graph; + +use crate::utils::columns_per_shard; +use crate::utils::rows_per_shard; +use crate::utils::value_to_term; - fn serialize(&mut self, arr: Array, graph: Graph) -> StorageResult<()> { + +pub type ComplementStorageResult = Result; +type ArrayToBytesCodec = Box; + + +pub trait ComplementLayoutOps{ + fn serialize(&mut self, arr: Array, graph: Graph) -> ComplementStorageResult<()> { let columns = arr.shape()[1] as usize; let count = AtomicU64::new(0); let binding = self.graph_iter(graph.to_owned()); @@ -64,70 +56,26 @@ pub trait ComplementLayout { Ok(()) } - fn parse( - &mut self, - arr: &Array, - dimensionality: &Dimensionality, - ) -> StorageResult { - // First, we create the 2D matrix in such a manner that the number of - // rows is the same as the size of the first terms; i.e, in the SPO - // orientation, that will be equals to the number of subjects, while - // the number of columns is equals to the size of the third terms; i.e, - // following the same example as before, it will be equals to the number - // of objects. In our case the dimensionality abstracts the process - // of getting the size of the concrete dimension - let matrix = Mutex::new(TriMat::new(( - dimensionality.first_term_size, // we obtain the size of the first terms - dimensionality.third_term_size, // we obtain the size of the third terms - ))); - - // We compute the number of shards; for us to achieve so, we have to obtain - // first dimension of the chunk grid - let number_of_shards = match arr.chunk_grid_shape() { - Some(chunk_grid) => chunk_grid[0], - - None => 0, - }; - - let number_of_columns = arr.shape()[1] as usize; - - // For each chunk in the Zarr array we retrieve it and parse it into a - // matrix, inserting the triplet in its corresponding position. The idea - // of parsing the array chunk-by-chunk allows us to keep the RAM usage - // low, as instead of parsing the whole array, we process smaller pieces - // of it. Once we have all the pieces processed, we will have parsed the - // whole array - for shard in 0..number_of_shards { - arr.retrieve_chunk_elements(&[shard, 0])? - // We divide each shard by the number of columns, as a shard is - // composed of chunks having the size of [1, number of cols] - .chunks(number_of_columns) - .enumerate() - .for_each(|(first_term_idx, chunk)| { - self.retrieve_chunk_elements( - &matrix, - first_term_idx + (shard * rows_per_shard(arr)) as usize, - chunk, - ); - }) - } - - // We use a CSC Matrix because typically, RDF knowledge graphs tend to - // have more rows than columns; as such, CSC matrices are optimized - // for that precise scenario - let x = matrix.lock(); - Ok(x.to_csc()) - } - fn graph_iter(&self, graph: Graph) -> Vec; fn store_chunk_elements(&self, chunk: &[C], columns: usize) -> Vec; - fn retrieve_chunk_elements( - &mut self, - matrix: &Mutex>, - first_term_idx: usize, - chunk: &[usize], - ); - fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize; - } +pub trait ComplementLayout: ComplementLayoutOps { + fn shape(&self, dimensionality: &Dimensionality) -> Vec; + fn data_type(&self) -> DataType; + fn chunk_shape( + &self, + chunking_strategy: ChunkingStrategy, + dimensionality: &Dimensionality, + ) -> ChunkGrid; + fn fill_value(&self) -> FillValue; + fn dimension_names(&self) -> Option>; + fn array_to_bytes_codec( + &self, + dimensionality: &Dimensionality, + ) -> ComplementStorageResult; +} + + + + diff --git a/src/complement/mod.rs b/src/complement/mod.rs index 7c46fcf..70115e7 100644 --- a/src/complement/mod.rs +++ b/src/complement/mod.rs @@ -2,27 +2,183 @@ pub mod layout; pub mod ops; pub mod params; -use crate::dictionary::Dictionary; +use serde_json::Map; +use crate::dictionary::NodesDictionary; +use crate::error::RemoteHDTError; +use crate::io::RdfParser; +use crate::io::Graph; +use crate::utils::rdf_to_value; +use crate::storage::params::Serialization; + +use self::params::ChunkingStrategy; use self::params::Dimensionality; use self::layout::default::DefaultComplementLayout; +use self::layout::ComplementLayout; +use self::params::Backend; + use zarrs::storage::store::OpendalStore; use zarrs::array::Array; +use zarrs::opendal::services::Fs; +use zarrs::opendal::Operator; +use zarrs::group::GroupBuilder; +use zarrs::array::ArrayBuilder; +use zarrs::opendal::services::Http; + +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +pub type ComplementStorageResult = Result; + + +const ARRAY_NAME: &str = "/group/RemoteHDT"; pub struct ComplementStorage { - dictionary: Dictionary, + dictionary: NodesDictionary, dimensionality: Dimensionality, - array: Option> + array: Option>, + layout: Box, + serialization: Serialization } impl ComplementStorage { pub fn new(layout: impl ComplementLayout + 'static) -> Self { ComplementStorage { - + dictionary: NodesDictionary::default(), + dimensionality: Dimensionality::default(), + array: None, + layout: Box::new(layout), + } + } + + pub fn get_dictionary(&self) -> NodesDictionary { + self.dictionary.to_owned() + } + + pub fn serialize<'a>( + &mut self, + store: Backend<'a>, + rdf_path: &'a str + // threading_strategy: ThreadingStrategy, TODO: implement this + ) -> ComplementStorageResult<&mut Self> { + let operator = match store { + Backend::FileSystem(path) => { + let mut builder = Fs::default(); + let path = PathBuf::from_str(path)?; + + match path.exists() { + true => return Err(RemoteHDTError::PathExists), + false => { + let path = match path.into_os_string().into_string() { + Ok(string) => string, + Err(_) => return Err(RemoteHDTError::OsPathToString), + }; + builder.root(&path); + } + } + + Operator::new(builder)?.finish() + } + Backend::HTTP(_) => return Err(RemoteHDTError::ReadOnlyBackend), + }; + + // 2. We can create the FileSystemStore appropiately + let store = Arc::new(OpendalStore::new(operator.blocking())); + + // Create a group and write metadata to filesystem + let group = GroupBuilder::new().build(store.clone(), "/group")?; + + let _ = group.store_metadata()?; + + // TODO: rayon::ThreadPoolBuilder::new() + // .num_threads(1) + // .build_global() + // .unwrap(); + + // 3. Import the RDF dump using `rdf-rs` + let graph = match RdfParser::parse_nodes(rdf_path) { + Ok((graph, dictionary)) => { + self.dictionary = dictionary; + self.dimensionality = Dimensionality::new(&self.dictionary, &graph); + graph + } + Err(_) => return Err(RemoteHDTError::RdfParse), + }; + + // 4. Build the structure of the Array; as such, several parameters of it are + // tweaked. Namely, the size of the array, the size of the chunks, the name + // of the different dimensions and the default values + let nodes = self.dictionary.nodes(); + let arr = ArrayBuilder::new( + self.layout.shape(&self.dimensionality), + self.layout.data_type(), + self.layout + .chunk_shape(ChunkingStrategy::Chunk, &self.dimensionality), + self.layout.fill_value(), + ) + .dimension_names(self.layout.dimension_names()) + .array_to_bytes_codec(self.layout.array_to_bytes_codec(&self.dimensionality)?) + .attributes({ + let mut attributes = Map::new(); + attributes.insert("nodes".into(), rdf_to_value(nodes)); + attributes + }) + .build(store, ARRAY_NAME)?; + + arr.store_metadata()?; + self.layout.serialize(arr, graph)?; + + Ok(self) + } + + + + pub fn load<'a>( + &mut self, + store: Backend<'a>, + // threading_strategy: ThreadingStrategy, TODO: implement this + ) -> ComplementStorageResult<&mut Self> { + let operator = match store { + Backend::FileSystem(path) => { + let mut builder = Fs::default(); + let path = PathBuf::from_str(path)?; + + match path.exists() { + false => return Err(RemoteHDTError::PathDoesNotExist), + true => { + let path = match path.into_os_string().into_string() { + Ok(string) => string, + Err(_) => return Err(RemoteHDTError::OsPathToString), + }; + builder.root(&path); + } + } + + Operator::new(builder)?.finish() + } + Backend::HTTP(path) => { + let mut builder = Http::default(); + builder.endpoint(path); + Operator::new(builder)?.finish() + } + }; + + let store: Arc = Arc::new(OpendalStore::new(operator.blocking())); + let arr = Array::new(store, ARRAY_NAME)?; + let dictionary = self.layout.retrieve_attributes(&arr)?; + self.dictionary = dictionary; + self.dimensionality = Dimensionality::new(&self.dictionary, &Graph::default()); + + match self.serialization { + Serialization::Zarr => self.array = Some(arr), + _ => unimplemented!() } + + Ok(self) } } \ No newline at end of file diff --git a/src/complement/params.rs b/src/complement/params.rs index 4a0ced9..31eba05 100644 --- a/src/complement/params.rs +++ b/src/complement/params.rs @@ -1,4 +1,40 @@ +use crate::dictionary::NodesDictionary; +use crate::io::Graph; + #[derive(Default)] pub struct Dimensionality { graph_size: Option, -} \ No newline at end of file + nodes_size: usize +} + +pub enum Backend<'a> { + FileSystem(&'a str), + HTTP(&'a str), +} + +pub enum ChunkingStrategy { + Chunk, +} + +impl Dimensionality { + pub(crate) fn new(dictionary: &NodesDictionary, graph: &Graph) -> Self { + Dimensionality { + graph_size: graph + .iter() + .map(|triples| triples.len()) + .reduce(|acc, a| acc + a), + nodes_size: dictionary.nodes_size() + } + } + + pub(crate) fn get_graph_size(&self) -> u64 { + self.graph_size.unwrap() as u64 + } + + pub(crate) fn get_nodes_size(&self) -> u64 { + self.nodes_size as u64 + } + + + +} diff --git a/src/dictionary.rs b/src/dictionary.rs index d58d06c..c49b89f 100644 --- a/src/dictionary.rs +++ b/src/dictionary.rs @@ -24,6 +24,7 @@ impl Default for Dictionary { } } + impl Dictionary { pub(crate) fn from_vec_str( reference_system: ReferenceSystem, @@ -123,3 +124,60 @@ impl Dictionary { self.get_object_idx(object).unwrap() } } + + + + + + +#[derive(Clone)] +pub struct NodesDictionary { + nodes: Set, +} + +impl Default for NodesDictionary { + fn default() -> Self { + NodesDictionary { + nodes:Set::new(vec!["PlaceHolder"]).unwrap(), + } + } +} + + +impl NodesDictionary { + pub(crate) fn from_vec_str( + reference_system: ReferenceSystem, + nodes: &Vec, + ) -> Self { + NodesDictionary { + nodes: Set::new(nodes).unwrap(), + } + } + + pub(crate) fn from_set_terms( + reference_system: ReferenceSystem, + nodes: HashSet + ) -> Self { + NodesDictionary { + nodes: Set::new(hash_to_set(nodes)).unwrap(), + } + } + + pub fn nodes_size(&self) -> usize { + self.nodes.len() + } + + pub fn nodes(&self) -> Set { + self.nodes.to_owned() + } + + pub fn get_node_idx(&self, node: &str) -> Option { + let mut locator = self.nodes.locator(); + + locator.run(node) + } + + pub fn get_node_idx_unchecked(&self, subject: &str) -> usize { + self.get_node_idx(subject).unwrap() + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 7bf6a33..28f00f2 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use std::fs::File; use std::io::BufReader; -use crate::dictionary::Dictionary; +use crate::dictionary::{Dictionary, NodesDictionary}; use crate::error::ParserError; use crate::storage::params::ReferenceSystem; @@ -17,6 +17,8 @@ mod rdf_xml; mod turtle; pub type RdfParserResult = Result<(Graph, Dictionary), ParserError>; +pub type RdfParserNodesResult = Result<(Graph, NodesDictionary), ParserError>; + pub type Graph = Vec>; trait Backend::Error>> { @@ -101,6 +103,8 @@ trait Backend::Error>> { Ok((graph, dictionary)) } + + fn parser_fn( path: &str, on_triple: &mut impl FnMut(Triple<'_>) -> Result<(), E>, @@ -141,4 +145,13 @@ impl RdfParser { None => Err(ParserError::NoFormatProvided), } } + + /* + * Parses an rdf but without reference system, meaning that there are no subject, predicate + and object, but nodes. + * + */ + pub fn parse_nodes(path: &str) -> RdfParserNodesResult { + unimplemented!() + } }