Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Nov 23, 2023
1 parent 61d965a commit 791e1c8
Show file tree
Hide file tree
Showing 17 changed files with 302 additions and 246 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
Cargo.lock
benches/*/*.nt
!resources/root.zarr
.vscode
.vscode
heaptrack.*
tests/out
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ serde_json = "1.0.108"
thiserror = "1.0.50"
fcsd = "0.2.0"
sprs = "0.11.1"
oxigraph = "0.3.20"
rio_turtle = "0.8.4"
rio_xml = "0.8.4"
rio_api = "0.8.4"
safe-transmute = "0.11.2"
rayon = "1.8.0"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

[profile.release]
codegen-units = 1
Expand Down
6 changes: 5 additions & 1 deletion examples/serialize_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use remote_hdt::storage::LocalStorage;
use std::env;
use std::time::Instant;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc;

fn main() {
let args: Vec<String> = env::args().collect();
if args.len() <= 1 {
Expand All @@ -18,7 +22,7 @@ fn main() {
.serialize(
format!("{}.zarr", zarr_path).as_str(),
format!("../lubm-uba-improved/out/{}.ttl", zarr_path).as_str(),
ChunkingStrategy::Sharding(1024),
ChunkingStrategy::Sharding(10240),
)
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ impl Dictionary {
pub fn get_predicate_idx(&self, predicate: &str) -> Option<usize> {
let mut locator = self.predicates.locator();
match locator.run(predicate) {
Some(value) => Some(value + 1), // 0s are reserved for non-existant edges
None => todo!(),
Some(value) => Some(value + 1),
None => None,
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/engine/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use zarrs::array::Array;
use zarrs::array_subset::ArraySubset;
use zarrs::{array::Array, storage::ReadableStorageTraits};
use zarrs::storage::ReadableStorageTraits;

use crate::error::EngineError::OperationError;
use crate::utils::{objects_per_chunk, subjects_per_chunk};
use crate::error::EngineError;
use crate::utils::objects_per_chunk;
use crate::utils::subjects_per_chunk;

use super::{EngineResult, EngineStrategy};
use super::EngineResult;
use super::EngineStrategy;

impl<T: ReadableStorageTraits> EngineStrategy<Vec<u8>> for Array<T> {
fn get_subject(&self, index: usize) -> EngineResult<Vec<u8>> {
Expand All @@ -16,7 +19,7 @@ impl<T: ReadableStorageTraits> EngineStrategy<Vec<u8>> for Array<T> {
.nth(chunk_to_index)
{
Some(ans) => Ok(ans.to_owned()),
None => Err(OperationError),
None => Err(EngineError::Operation),
}
}

Expand Down
16 changes: 14 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub enum RemoteHDTError {
#[error(transparent)]
HTTPCreate(#[from] HTTPStoreCreateError),
#[error("The Path already exists, please provide an empty path")]
PathExistsError,
PathExists,
#[error(transparent)]
GZipCompression(#[from] GzipCompressionLevelError),
#[error("The Graph you are trying to serialize is empty")]
Expand All @@ -42,5 +42,17 @@ pub enum EngineError {
#[error(transparent)]
Array(#[from] ArrayError),
#[error("Operation error")]
OperationError,
Operation,
}

#[derive(Error, Debug)]
pub enum ParserError {
#[error("Could not parse the Dicitonary: `{0}`")]
Dictionary(String),
#[error("Could not parse the Graph: `{0}`")]
Graph(String),
#[error("Format {0} not supported")]
NotSupportedFormat(String),
#[error("No format provided")]
NoFormatProvided,
}
134 changes: 84 additions & 50 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,106 @@
use oxigraph::io::GraphFormat;
use oxigraph::io::GraphParser;
use std::collections::HashMap;
use rio_api::model::Triple;
use rio_api::parser::TriplesParser;
use std::collections::HashSet;
use std::fs::File;
use std::io::BufReader;

use crate::dictionary::Dictionary;
use crate::error::ParserError;

pub type Graph = HashMap<String, Vec<(String, String)>>;
use self::ntriples::NTriples;
use self::rdf_xml::RdfXml;
use self::turtle::Turtle;

pub struct RdfParser {
path: String,
format: GraphFormat,
}
mod ntriples;
mod rdf_xml;
mod turtle;

impl RdfParser {
pub fn new(path: &str) -> Result<Self, String> {
match path.split('.').last() {
Some("nt") => Ok(RdfParser {
path: path.to_string(),
format: GraphFormat::NTriples,
}),
Some("ttl") => Ok(RdfParser {
path: path.to_string(),
format: GraphFormat::Turtle,
}),
Some("rdf") => Ok(RdfParser {
path: path.to_string(),
format: GraphFormat::RdfXml,
}),
_ => Err(String::from("Not supported format for loading the dump")),
}
}
pub type RdfParserResult = Result<(Graph, Dictionary), ParserError>;
pub type Graph = Vec<Vec<(u32, u32)>>;

pub fn parse(&self) -> Result<(Graph, Dictionary), String> {
let mut graph = Graph::new();
trait Backend<T: TriplesParser, E: From<<T>::Error>> {
fn parse(path: &str) -> RdfParserResult {
// We create as many HashSets as fields we will be storing; that is, one
// for the subjects, another for the predicates, and one for the objects.
// The idea is that we will create a Dictionary matching every Term to
// an integer value; thus, we will be able to store the Triples in a
// more efficient manner
let mut subjects = HashSet::new();
let mut predicates = HashSet::new();
let mut objects = HashSet::new();

let reader = BufReader::new(match File::open(self.path.clone()) {
if let Err(err) = Self::parser_fn(path, &mut |triple: Triple| {
{
subjects.insert(triple.subject.to_string());
predicates.insert(triple.predicate.to_string());
objects.insert(triple.object.to_string());
};
Ok(())
} as Result<(), E>)
{
return Err(ParserError::Dictionary(err));
}

let mut graph = vec![Vec::new(); subjects.len()];
let dictionary = Dictionary::from_set_terms(subjects, predicates, objects);

if let Err(err) = Self::parser_fn(path, &mut |triple: Triple| {
{
let sidx = dictionary.get_subject_idx_unchecked(&triple.subject.to_string());
let pidx = dictionary.get_predicate_idx_unchecked(&triple.predicate.to_string());
let oidx = dictionary.get_object_idx_unchecked(&triple.object.to_string());
graph
.get_mut(sidx)
.unwrap()
.push((pidx as u32, oidx as u32))
};
Ok(())
} as Result<(), E>)
{
return Err(ParserError::Graph(err));
}

Ok((graph, dictionary))
}

fn parser_fn(
path: &str,
on_triple: &mut impl FnMut(Triple<'_>) -> Result<(), E>,
) -> Result<(), String> {
// We open a reader for the file that is requested to be read. The idea
// is that we will iterate over the triples stored in a certain file
let reader = BufReader::new(match File::open(path) {
Ok(file) => file,
Err(_) => return Err(String::from("Cannot open the file")),
});

let triples = match GraphParser::from_format(self.format).read_triples(reader) {
Ok(iter) => iter,
Err(_) => return Err(String::from("Error parsing the graph")),
};

for triple in triples.flatten() {
subjects.insert(triple.subject.to_owned().to_string());
predicates.insert(triple.predicate.to_owned().to_string());
objects.insert(triple.object.to_owned().to_string());

if let Some(value) = graph.get_mut(&triple.subject.to_string()) {
value.push((triple.predicate.to_string(), triple.object.to_string()));
} else {
graph.insert(
triple.subject.to_string(),
vec![(triple.predicate.to_string(), triple.object.to_string())],
);
// We create a parser that will be in charge of reading the file retrieving
// the triples that are stored in the provided file
let mut parser = Self::concrete_parser(reader);

while !parser.is_end() {
if parser.parse_step(on_triple).is_err() {
// We skip the line if it is not a valid triple
continue;
}
}

Ok((
graph,
Dictionary::from_set_terms(subjects, predicates, objects),
))
Ok(())
}

fn concrete_parser(reader: BufReader<File>) -> T;
}

pub struct RdfParser;

impl RdfParser {
pub fn parse(path: &str) -> RdfParserResult {
match path.split('.').last() {
Some("nt") => NTriples::parse(path),
Some("ttl") => Turtle::parse(path),
Some("rdf") => RdfXml::parse(path),
Some(format) => Err(ParserError::NotSupportedFormat(format.to_string())),
None => Err(ParserError::NoFormatProvided),
}
}
}
16 changes: 16 additions & 0 deletions src/io/ntriples.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use rio_turtle::NTriplesParser;
use rio_turtle::TurtleError;
use std::fs::File;
use std::io::BufReader;

use super::Backend;

type NTriplesFileParser = NTriplesParser<BufReader<File>>;

pub struct NTriples;

impl Backend<NTriplesFileParser, TurtleError> for NTriples {
fn concrete_parser(reader: BufReader<File>) -> NTriplesFileParser {
NTriplesParser::new(reader)
}
}
16 changes: 16 additions & 0 deletions src/io/rdf_xml.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use rio_xml::RdfXmlError;
use rio_xml::RdfXmlParser;
use std::fs::File;
use std::io::BufReader;

use super::Backend;

type RdfXmlFileParser = RdfXmlParser<BufReader<File>>;

pub struct RdfXml;

impl Backend<RdfXmlFileParser, RdfXmlError> for RdfXml {
fn concrete_parser(reader: BufReader<File>) -> RdfXmlFileParser {
RdfXmlParser::new(reader, None)
}
}
16 changes: 16 additions & 0 deletions src/io/turtle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use rio_turtle::TurtleError;
use rio_turtle::TurtleParser;
use std::fs::File;
use std::io::BufReader;

use super::Backend;

type TurtleFileParser = TurtleParser<BufReader<File>>;

pub struct Turtle;

impl Backend<TurtleFileParser, TurtleError> for Turtle {
fn concrete_parser(reader: BufReader<File>) -> TurtleFileParser {
TurtleParser::new(reader, None)
}
}
Loading

0 comments on commit 791e1c8

Please sign in to comment.