Skip to content

Commit

Permalink
working on it
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Mar 16, 2024
1 parent f30f4f0 commit 1f76d89
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 93 deletions.
4 changes: 2 additions & 2 deletions examples/ntriples/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::layout::tabular::TabularLayout;
use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::params::*;
use remote_hdt::storage::Storage;

pub fn main() -> Result<(), RemoteHDTError> {
Storage::new(TabularLayout, Serialization::Zarr).serialize(
Backend::FileSystem("root.zarr"),
"examples/ntriples/rdf.nt",
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
Optimization::Storage(ReferenceSystem::SPO),
)?;

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions examples/rdf_xml/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::layout::tabular::TabularLayout;
use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::params::*;
use remote_hdt::storage::Storage;

pub fn main() -> Result<(), RemoteHDTError> {
Storage::new(TabularLayout, Serialization::Zarr).serialize(
Backend::FileSystem("root.zarr"),
"examples/rdf_xml/rdf.rdf",
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
Optimization::Storage(ReferenceSystem::SPO),
)?;

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions examples/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::layout::matrix::MatrixLayout;
use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::params::*;
use remote_hdt::storage::Storage;
use std::env;
use std::time::Instant;
Expand All @@ -21,7 +21,7 @@ fn main() -> Result<(), RemoteHDTError> {
Backend::FileSystem(zarr_path),
rdf_path,
ChunkingStrategy::Sharding(*shard_size),
ReferenceSystem::SPO,
Optimization::Query,
)?;

println!("Elapsed time: {:.2?}", before.elapsed());
Expand Down
4 changes: 2 additions & 2 deletions examples/turtle/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::layout::tabular::TabularLayout;
use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::params::*;
use remote_hdt::storage::Storage;

pub fn main() -> Result<(), RemoteHDTError> {
Storage::new(TabularLayout, Serialization::Zarr).serialize(
Backend::FileSystem("root.zarr"),
"examples/turtle/rdf.ttl",
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
Optimization::Storage(ReferenceSystem::SPO),
)?;

Ok(())
Expand Down
58 changes: 36 additions & 22 deletions src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use super::utils::hash_to_set;

#[derive(Clone)]
pub struct Dictionary {
reference_system: ReferenceSystem,
subjects: Set,
predicates: Set,
objects: Set,
Expand All @@ -16,7 +15,6 @@ pub struct Dictionary {
impl Default for Dictionary {
fn default() -> Self {
Dictionary {
reference_system: ReferenceSystem::SPO,
subjects: Set::new(vec!["PlaceHolder"]).unwrap(),
predicates: Set::new(vec!["PlaceHolder"]).unwrap(),
objects: Set::new(vec!["PlaceHolder"]).unwrap(),
Expand All @@ -26,27 +24,23 @@ impl Default for Dictionary {

impl Dictionary {
pub(crate) fn from_vec_str(
reference_system: ReferenceSystem,
subjects: &Vec<String>,
predicates: &Vec<String>,
objects: &Vec<String>,
) -> Self {
Dictionary {
reference_system,
subjects: Set::new(subjects).unwrap(),
predicates: Set::new(predicates).unwrap(),
objects: Set::new(objects).unwrap(),
}
}

pub(crate) fn from_set_terms(
reference_system: ReferenceSystem,
subjects: HashSet<String>,
predicates: HashSet<String>,
objects: HashSet<String>,
) -> Self {
Dictionary {
reference_system,
subjects: Set::new(hash_to_set(subjects)).unwrap(),
predicates: Set::new(hash_to_set(predicates)).unwrap(),
objects: Set::new(hash_to_set(objects)).unwrap(),
Expand Down Expand Up @@ -77,49 +71,69 @@ impl Dictionary {
self.objects.to_owned()
}

pub fn get_reference_system(&self) -> ReferenceSystem {
self.reference_system.to_owned()
}

pub fn get_subject_idx(&self, subject: &str) -> Option<usize> {
pub fn get_subject_idx(
&self,
subject: &str,
reference_system: &ReferenceSystem,
) -> Option<usize> {
let mut locator = self.subjects.locator();
match self.reference_system {
match reference_system {
ReferenceSystem::PSO | ReferenceSystem::OSP => {
locator.run(subject).map(|value| value + 1)
}
_ => locator.run(subject),
}
}

pub fn get_subject_idx_unchecked(&self, subject: &str) -> usize {
self.get_subject_idx(subject).unwrap()
pub fn get_subject_idx_unchecked(
&self,
subject: &str,
reference_system: &ReferenceSystem,
) -> usize {
self.get_subject_idx(subject, reference_system).unwrap()
}

pub fn get_predicate_idx(&self, predicate: &str) -> Option<usize> {
pub fn get_predicate_idx(
&self,
predicate: &str,
reference_system: &ReferenceSystem,
) -> Option<usize> {
let mut locator = self.predicates.locator();
match self.reference_system {
match reference_system {
ReferenceSystem::SPO | ReferenceSystem::OPS => {
locator.run(predicate).map(|value| value + 1)
}
_ => locator.run(predicate),
}
}

pub fn get_predicate_idx_unchecked(&self, predicate: &str) -> usize {
self.get_predicate_idx(predicate).unwrap()
pub fn get_predicate_idx_unchecked(
&self,
predicate: &str,
reference_system: &ReferenceSystem,
) -> usize {
self.get_predicate_idx(predicate, reference_system).unwrap()
}

pub fn get_object_idx(&self, object: &str) -> Option<usize> {
pub fn get_object_idx(
&self,
object: &str,
reference_system: &ReferenceSystem,
) -> Option<usize> {
let mut locator = self.objects.locator();
match self.reference_system {
match reference_system {
ReferenceSystem::SOP | ReferenceSystem::POS => {
locator.run(object).map(|value| value + 1)
}
_ => locator.run(object),
}
}

pub fn get_object_idx_unchecked(&self, object: &str) -> usize {
self.get_object_idx(object).unwrap()
pub fn get_object_idx_unchecked(
&self,
object: &str,
reference_system: &ReferenceSystem,
) -> usize {
self.get_object_idx(object, reference_system).unwrap()
}
}
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub enum RemoteHDTError {
ObjectsNotInJSON,
#[error("The Reference System has not been serialized properly")]
ReferenceSystemNotInJSON,
#[error("The Optimization has not been serialized properly")]
OptimizationNotInJSON,
#[error("Error serializing the triples of the Graph")]
TripleSerialization,
#[error("The provided path is not valid")]
Expand Down
12 changes: 7 additions & 5 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ trait Backend<T: TriplesParser, E: From<<T>::Error>> {
ReferenceSystem::OSP | ReferenceSystem::OPS => objects.len(),
}
];
let dictionary =
Dictionary::from_set_terms(reference_system.to_owned(), subjects, predicates, objects);
let dictionary = Dictionary::from_set_terms(subjects, predicates, objects);

if let Err(err) = Self::parser_fn(path, &mut |triple: Triple| {
{
let sidx = dictionary.get_subject_idx_unchecked(&triple.subject.to_string());
let pidx = dictionary.get_predicate_idx_unchecked(&triple.predicate.to_string());
let oidx = dictionary.get_object_idx_unchecked(&triple.object.to_string());
let sidx = dictionary
.get_subject_idx_unchecked(&triple.subject.to_string(), reference_system);
let pidx = dictionary
.get_predicate_idx_unchecked(&triple.predicate.to_string(), reference_system);
let oidx = dictionary
.get_object_idx_unchecked(&triple.object.to_string(), reference_system);

match reference_system {
ReferenceSystem::SPO => {
Expand Down
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use remote_hdt::storage::layout::tabular::TabularLayout;
use remote_hdt::storage::params::Backend;
use remote_hdt::storage::params::ChunkingStrategy;
use remote_hdt::storage::params::ReferenceSystem;
use remote_hdt::storage::params::Optimization;
use remote_hdt::storage::params::Serialization;
use remote_hdt::storage::Storage;
use remote_hdt::storage::StorageResult;
Expand All @@ -13,7 +13,6 @@ struct Args {
/// Input RDF file
#[arg(short, long)]
rdf: String,

/// Output Zarr directory
#[arg(short, long, default_value = "root.zarr")]
zarr: String,
Expand All @@ -25,7 +24,7 @@ fn main() -> StorageResult<()> {
Backend::FileSystem(&args.zarr),
&args.rdf,
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
Optimization::Query,
)?;
Ok(())
}
2 changes: 1 addition & 1 deletion src/storage/layout/matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Layout<Chunk> for MatrixLayout {

fn chunk_shape(
&self,
chunking_strategy: ChunkingStrategy,
chunking_strategy: &ChunkingStrategy,
dimensionality: &Dimensionality,
) -> ChunkGrid {
vec![
Expand Down
37 changes: 22 additions & 15 deletions src/storage/layout/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use zarrs::array::DataType;
use zarrs::array::DimensionName;
use zarrs::array::FillValue;
use zarrs::array_subset::ArraySubset;
use zarrs::group::Group;
use zarrs::storage::store::FilesystemStore;
use zarrs::storage::ReadableStorageTraits;

Expand All @@ -19,6 +20,7 @@ use crate::utils::columns_per_shard;
use crate::utils::rows_per_shard;
use crate::utils::value_to_term;

use super::params::Optimization;
use super::ChunkingStrategy;
use super::Dimensionality;
use super::ReferenceSystem;
Expand All @@ -31,6 +33,24 @@ pub mod matrix;
pub mod tabular;

pub trait LayoutOps<C> {
fn retrieve_group_attributes(
&mut self,
group: &Group<dyn ReadableStorageTraits>,
) -> StorageResult<Optimization> {
// 4. We get the attributes so we can obtain some values that we will need
let attributes = group.attributes();

let optimization: Optimization = match attributes.get("optimization") {
Some(reference_system) => reference_system,
None => return Err(RemoteHDTError::OptimizationNotInJSON),
}
.as_str()
.unwrap()
.into();

Ok(optimization)
}

fn retrieve_attributes(
&mut self,
arr: &Array<dyn ReadableStorageTraits>,
Expand All @@ -51,20 +71,7 @@ pub trait LayoutOps<C> {
None => return Err(RemoteHDTError::ObjectsNotInJSON),
});

let reference_system: ReferenceSystem = match attributes.get("reference_system") {
Some(reference_system) => reference_system,
None => return Err(RemoteHDTError::ReferenceSystemNotInJSON),
}
.as_str()
.unwrap()
.into();

Ok(Dictionary::from_vec_str(
reference_system,
subjects,
predicates,
objects,
))
Ok(Dictionary::from_vec_str(subjects, predicates, objects))
}

fn serialize(&mut self, arr: &Array<FilesystemStore>, graph: Graph) -> StorageResult<()> {
Expand Down Expand Up @@ -172,7 +179,7 @@ pub trait Layout<C>: LayoutOps<C> {
fn data_type(&self) -> DataType;
fn chunk_shape(
&self,
chunking_strategy: ChunkingStrategy,
chunking_strategy: &ChunkingStrategy,
dimensionality: &Dimensionality,
) -> ChunkGrid;
fn fill_value(&self) -> FillValue;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/layout/tabular.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Layout<Chunk> for TabularLayout {
DataType::UInt32
}

fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid {
fn chunk_shape(&self, chunking_strategy: &ChunkingStrategy, _: &Dimensionality) -> ChunkGrid {
vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value
}

Expand Down
Loading

0 comments on commit 1f76d89

Please sign in to comment.