-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,29 @@ | ||
use remote_hdt::error::RemoteHDTError; | ||
use remote_hdt::metadata::Metadata; | ||
use remote_hdt::storage::params::Serialization; | ||
use remote_hdt::storage::params::Backend; | ||
use remote_hdt::storage::params::ReferenceSystem; | ||
use remote_hdt::storage::params::Serialization; | ||
use remote_hdt::storage::layout::metadata::MetadataLayout; | ||
|
||
|
||
use remote_hdt::storage::params::ChunkingStrategy; | ||
|
||
fn main() -> Result<(), RemoteHDTError> { | ||
|
||
let rdf_path = ""; | ||
let rdf_path = "resources/1-lubm.ttl"; | ||
let metadata_path = ""; | ||
let zarr_path = "1-lubm-metadata.zarr"; | ||
let fields = vec!["X_pos", "Y_pos"]; | ||
let mut metadata: Metadata = Metadata::new(Serialization::Zarr); | ||
metadata.serialize(rdf_path, ReferenceSystem::SPO,metadata_path,fields).unwrap(); | ||
let mut metadata = Metadata::new( MetadataLayout,Serialization::Zarr); | ||
metadata | ||
.serialize( | ||
Backend::FileSystem(zarr_path), | ||
rdf_path, | ||
ChunkingStrategy::Sharding(1024), | ||
ReferenceSystem::SPO, | ||
|
||
metadata_path, | ||
fields) | ||
.unwrap(); | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,82 +1,148 @@ | ||
|
||
|
||
|
||
use std::default; | ||
Check failure on line 1 in src/metadata/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 1 in src/metadata/mod.rs GitHub Actions / Check (stable)
|
||
use std::path::PathBuf; | ||
use std::str::FromStr; | ||
use std::collections::HashSet; | ||
use std::sync::Arc; | ||
|
||
use serde_json::Map; | ||
|
||
use crate::dictionary::Dictionary; | ||
use crate::storage::layout; | ||
Check failure on line 9 in src/metadata/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 9 in src/metadata/mod.rs GitHub Actions / Check (stable)
|
||
use crate::storage::layout::metadata::MetadataLayout; | ||
Check failure on line 10 in src/metadata/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 10 in src/metadata/mod.rs GitHub Actions / Check (stable)
|
||
use crate::storage::layout::tabular::TabularLayout; | ||
Check failure on line 11 in src/metadata/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 11 in src/metadata/mod.rs GitHub Actions / Check (stable)
|
||
use crate::utils::rdf_to_value; | ||
|
||
use crate::dictionary::Dictionary; | ||
use crate::io::Graph; | ||
Check failure on line 15 in src/metadata/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 15 in src/metadata/mod.rs GitHub Actions / Check (stable)
|
||
use crate::io::RdfParser; | ||
|
||
use crate::storage::params::Serialization; | ||
use crate::error::RemoteHDTError; | ||
use crate::storage::layout::Layout; | ||
use crate::storage::params::Backend; | ||
use crate::storage::params::Dimensionality; | ||
use crate::storage::params::ReferenceSystem; | ||
use crate::error::RemoteHDTError; | ||
use crate::storage::params::Serialization; | ||
use crate::storage::params::ChunkingStrategy; | ||
|
||
use fcsd::Set; | ||
use zarrs::array::Array; | ||
use zarrs::array::ArrayBuilder; | ||
use zarrs::opendal::raw::oio::StreamExt; | ||
use zarrs::opendal::services::Fs; | ||
use zarrs::opendal::services::Http; | ||
Check failure on line 31 in src/metadata/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 31 in src/metadata/mod.rs GitHub Actions / Check (stable)
|
||
use zarrs::opendal::Operator; | ||
use zarrs::storage::store::OpendalStore; | ||
use zarrs::array::Array; | ||
|
||
use super::utils::hash_to_set; | ||
|
||
|
||
pub type MetadataResult<T> = Result<T, RemoteHDTError>; | ||
pub mod structure; | ||
|
||
pub struct Metadata { | ||
flatten_graph: Vec<(u32, u32, u32)>, | ||
const ARRAY_NAME: &str = "/group/RemoteHDT"; // TODO: parameterize this | ||
|
||
pub struct Metadata<C> { | ||
flatten_graph: Vec<(String)>, | ||
serialization: Serialization, | ||
dictionary : Dictionary, | ||
array: Option<Array<OpendalStore>> | ||
dictionary: Dictionary, | ||
array: Option<Array<OpendalStore>>, | ||
dimensionality: Dimensionality, | ||
layout: Box<dyn Layout<C>>, | ||
} | ||
|
||
|
||
impl Metadata{ | ||
pub fn new( serialization: Serialization) -> Self { | ||
impl<C> Metadata<C> { | ||
pub fn new( layout: impl Layout<C> + 'static, serialization: Serialization) -> Self { | ||
Metadata { | ||
flatten_graph: Vec::<(u32, u32, u32)>::default(), | ||
serialization: serialization, | ||
dictionary: Dictionary::default(), | ||
array: None, | ||
flatten_graph: Vec::<String>::default(), | ||
serialization: serialization, | ||
dictionary: Dictionary::default(), | ||
array: None, | ||
dimensionality: Default::default(), | ||
layout: Box::new(layout), | ||
} | ||
} | ||
|
||
|
||
|
||
|
||
pub fn serialize(&mut self, rdf_path: &str, reference_system: ReferenceSystem, metadata_path: &str, fields: Vec<&str>) -> MetadataResult<&mut Self>{ | ||
|
||
let graph_vector: Graph; | ||
pub fn serialize<'a>( | ||
&mut self, | ||
store: Backend<'a>, | ||
rdf_path: &str, | ||
chunking_strategy: ChunkingStrategy, | ||
reference_system: ReferenceSystem, | ||
|
||
metadata_path: &str, | ||
fields: Vec<&str>, | ||
) -> MetadataResult<&mut Self> { | ||
|
||
let operator = match store { | ||
Backend::FileSystem(path) => { | ||
let mut builder = Fs::default(); | ||
let path = PathBuf::from_str(path)?; | ||
|
||
match path.exists() { | ||
true => return Err(RemoteHDTError::PathExists), | ||
false => { | ||
let path = match path.into_os_string().into_string() { | ||
Ok(string) => string, | ||
Err(_) => return Err(RemoteHDTError::OsPathToString), | ||
}; | ||
builder.root(&path); | ||
} | ||
} | ||
|
||
Operator::new(builder)?.finish() | ||
} | ||
Backend::HTTP(_) => return Err(RemoteHDTError::ReadOnlyBackend), | ||
}; | ||
|
||
|
||
// 2. We can create the FileSystemStore appropiately | ||
let store = Arc::new(OpendalStore::new(operator.blocking())); | ||
|
||
match RdfParser::parse(rdf_path, &reference_system) { | ||
let graph = match RdfParser::parse(rdf_path, &reference_system) { | ||
Ok((graph, dictionary)) => { | ||
graph_vector = graph; | ||
self.dictionary = dictionary; | ||
self.dimensionality = Dimensionality::new(&self.dictionary, &graph); | ||
graph | ||
} | ||
Err(_) => return Err(RemoteHDTError::RdfParse), | ||
}; | ||
|
||
//Flatten the graph into triples | ||
let mut count = 0; | ||
for i in graph_vector.iter() { | ||
for j in i.iter(){ | ||
self.flatten_graph.push((count, j.0, j.1)) | ||
for i in graph.iter() { | ||
for j in i.iter() { | ||
self.flatten_graph.push(format!["{};{};{}",count, j.0, j.1]) | ||
} | ||
count +=1; | ||
count += 1; | ||
} | ||
|
||
//TODO: change the implementation so it is only done here the flatten | ||
let triples:HashSet<_> = self.flatten_graph.clone().into_iter().collect(); | ||
let subjects = self.dictionary.subjects(); | ||
let predicates = self.dictionary.predicates(); | ||
let objects = self.dictionary.objects(); | ||
|
||
let arr = ArrayBuilder::new( | ||
self.layout.shape(&self.dimensionality), | ||
self.layout.data_type(), | ||
self.layout | ||
.chunk_shape(chunking_strategy, &self.dimensionality), | ||
self.layout.fill_value(), | ||
) | ||
.dimension_names(self.layout.dimension_names(&reference_system)) | ||
.array_to_bytes_codec(self.layout.array_to_bytes_codec(&self.dimensionality)?) | ||
.attributes({ | ||
let mut attributes = Map::new(); | ||
attributes.insert("triples".into(), rdf_to_value(Set::new(hash_to_set(triples)).unwrap())); | ||
attributes.insert("subjects".into(), rdf_to_value(subjects)); | ||
attributes.insert("predicates".into(), rdf_to_value(predicates)); | ||
attributes.insert("objects".into(), rdf_to_value(objects)); | ||
attributes.insert("reference_system".into(), reference_system.as_ref().into()); | ||
attributes | ||
}) | ||
.build(store, ARRAY_NAME)?; | ||
|
||
arr.store_metadata()?; | ||
self.layout.serialize(arr, graph)?; | ||
|
||
Ok(self) | ||
} | ||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
use parking_lot::Mutex; | ||
Check failure on line 1 in src/metadata/structure/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 1 in src/metadata/structure/mod.rs GitHub Actions / Check (stable)
|
||
use sprs::TriMat; | ||
Check failure on line 2 in src/metadata/structure/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 2 in src/metadata/structure/mod.rs GitHub Actions / Check (stable)
|
||
use std::sync::atomic::AtomicU64; | ||
Check failure on line 3 in src/metadata/structure/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 3 in src/metadata/structure/mod.rs GitHub Actions / Check (stable)
|
||
use std::sync::atomic::Ordering; | ||
Check failure on line 4 in src/metadata/structure/mod.rs GitHub Actions / Clippy (stable)
Check warning on line 4 in src/metadata/structure/mod.rs GitHub Actions / Check (stable)
|
||
use zarrs::array::codec::ArrayToBytesCodecTraits; | ||
use zarrs::array::Array; | ||
use zarrs::array::ChunkGrid; | ||
use zarrs::array::DataType; | ||
use zarrs::array::DimensionName; | ||
use zarrs::array::FillValue; | ||
use zarrs::array_subset::ArraySubset; | ||
use zarrs::storage::store::OpendalStore; | ||
|
||
use crate::dictionary::Dictionary; | ||
use crate::error::RemoteHDTError; | ||
use crate::io::Graph; | ||
use crate::utils::columns_per_shard; | ||
use crate::utils::rows_per_shard; | ||
use crate::utils::value_to_term; | ||
|
||
use super::ChunkingStrategy; | ||
use super::Dimensionality; | ||
use super::ReferenceSystem; | ||
|
||
|
||
type ArrayToBytesCodec = Box<dyn ArrayToBytesCodecTraits>; | ||
|
||
pub mod coordinates; | ||
|
||
|
||
pub trait StructureOps<C> { | ||
|
||
} | ||
|
||
pub trait Structure<C>: StructureOps<C> { | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
use std::num::NonZeroU64; | ||
|
||
use parking_lot::Mutex; | ||
use sprs::TriMat; | ||
use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder; | ||
use zarrs::array::codec::ArrayToBytesCodecTraits; | ||
use zarrs::array::codec::GzipCodec; | ||
use zarrs::array::ChunkGrid; | ||
use zarrs::array::DataType; | ||
use zarrs::array::DimensionName; | ||
use zarrs::array::FillValue; | ||
|
||
use super::ChunkingStrategy; | ||
use super::Dimensionality; | ||
use super::ReferenceSystem; | ||
use super::StorageResult; | ||
|
||
use crate::io::Graph; | ||
use crate::storage::layout::LayoutOps; | ||
use crate::storage::Layout; | ||
|
||
type Chunk = (u32, u32, u32); | ||
|
||
pub struct MetadataLayout; | ||
|
||
impl Layout<Chunk> for MetadataLayout { | ||
fn shape(&self, dimensionality: &Dimensionality) -> Vec<u64> { | ||
vec![dimensionality.get_graph_size(), 3] | ||
} | ||
|
||
fn data_type(&self) -> DataType { | ||
DataType::UInt32 | ||
} | ||
|
||
fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid { | ||
vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value | ||
} | ||
|
||
fn fill_value(&self) -> FillValue { | ||
FillValue::from(0u32) | ||
} | ||
|
||
fn dimension_names(&self, _: &ReferenceSystem) -> Option<Vec<DimensionName>> { | ||
Some(vec![ | ||
DimensionName::new("Triples"), | ||
DimensionName::new("Fields"), | ||
]) | ||
} | ||
|
||
fn array_to_bytes_codec( | ||
&self, | ||
_: &Dimensionality, | ||
) -> StorageResult<Box<dyn ArrayToBytesCodecTraits>> { | ||
let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![1, 3].try_into()?); | ||
sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); | ||
Ok(Box::new(sharding_codec_builder.build())) | ||
} | ||
} | ||
|
||
impl LayoutOps<Chunk> for MetadataLayout { | ||
fn graph_iter(&self, graph: Graph) -> Vec<Chunk> { | ||
graph | ||
.iter() | ||
.enumerate() | ||
.flat_map(|(first_term, triples)| { | ||
|
||
let count = 0; | ||
triples | ||
.iter() | ||
.map(|&(second_term, third_term)| { | ||
(count as u32, 1, 1) | ||
}) | ||
.collect::<Vec<Chunk>>() | ||
}) | ||
.collect::<Vec<Chunk>>() | ||
} | ||
|
||
fn store_chunk_elements(&self, chunk: &[Chunk], _: usize) -> Vec<u32> { | ||
let mut ans = Vec::new(); | ||
for &(first_term, second_term, third_term) in chunk { | ||
ans.push(first_term); | ||
ans.push(second_term); | ||
ans.push(third_term); | ||
} | ||
ans | ||
} | ||
|
||
fn retrieve_chunk_elements( | ||
&mut self, | ||
matrix: &Mutex<TriMat<usize>>, | ||
first_term_index: usize, // TODO: will first_term_index instead of chunk[0] do the trick? | ||
chunk: &[usize], | ||
) { | ||
matrix | ||
.lock() | ||
.add_triplet(chunk[0], chunk[2], chunk[1] as usize); | ||
} | ||
|
||
fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize { | ||
dimensionality.first_term_size * dimensionality.third_term_size | ||
} | ||
} |