Skip to content

Commit

Permalink
support parsing RDF files as graph input
Browse files Browse the repository at this point in the history
  • Loading branch information
ceteri committed Oct 2, 2022
1 parent 218fbb7 commit bb7793b
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 21 deletions.
32 changes: 24 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
# pynock

This library `pynock` provides Examples for working with low-level
Parquet read/write efficiently in Python. The intent is to serialize
graphs which can align data representations for multiple areas of
popular graph technologies:
Parquet read/write efficiently in Python.

Our intent is to serialize graphs which align the data representations
required for multiple areas of popular graph technologies:

* semantic graphs (e.g., W3C)
* labeled property graphs (e.g., openCypher)
* probabilistic graphs (e.g., PSL)
* edge lists (e.g., NetworkX)

This approach also supports distributed partitions based on
Arrow/Parquet which can scale to very large (+1 T node) graphs.
This approach also supports distributed partitions based on Parquet
which can scale to very large (+1 T node) graphs.

For details about the formatting required in Parquet files, see the
[`FORMAT.md`](https://github.com/DerwenAI/pynock/blob/main/FORMAT.md)
page.


## Caveats

Note that the `pynock` library does not provide any support for graph
computation or querying, merely for manipulating and validating
serialization formats.

Our intent is to provide examples where others from the broader open
source developer community can help troubleshoot edge cases in
Parquet.


## Dependencies

Expand All @@ -40,10 +52,14 @@ python3 -m pip install -U pip wheel
python3 -m pip install -r requirements.txt
```

Then to run the example code from CLI:
Then to run examples from CLI:

```
python3 example.py load-parq --file dat/recipes.parq --debug
```

```
python3 example.py load-parquet --file dat/recipes.parq --debug
python3 example.py load-rdf --file dat/tiny.ttl --save-cvs foo.cvs
```

For further information:
Expand Down Expand Up @@ -72,7 +88,7 @@ A `nock` is the English word for the end of an arrow opposite its point.

## Background

For more details about using Arrow/Parquet see:
For more details about using Arrow and Parquet see:

["Apache Arrow homepage"](https://arrow.apache.org/)

Expand Down
Binary file added dat/tiny.parq
Binary file not shown.
7 changes: 7 additions & 0 deletions dat/tiny.ttl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
@prefix ind: <http://purl.org/heals/ingredient/> .
@prefix wtm: <http://purl.org/heals/food/> .

<https://www.food.com/recipe/327593> a wtm:Recipe ;
wtm:hasIngredient ind:ChickenEgg,
ind:CowMilk,
ind:WholeWheatFlour .
41 changes: 36 additions & 5 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
APP = typer.Typer()


@APP.command()
def load_parquet (
@APP.command("load-parq")
def cli_load_parq (
*,
load_parq: str = typer.Option(..., "--file", "-f", help="input Parquet file"),
save_csv: str = typer.Option(None, "--save-csv", help="output as CSV"),
Expand Down Expand Up @@ -47,11 +47,11 @@ def load_parquet (
part.save_file_csv(cloudpathlib.AnyPath(save_csv))


@APP.command()
def load_csv (
@APP.command("load-csv")
def cli_load_csv (
*,
load_csv: str = typer.Option(..., "--file", "-f", help="input CSV file"),
save_parq: str = typer.Option(None, "--save-parq", help="output Parquet"),
save_parq: str = typer.Option(None, "--save-parq", help="output as Parquet"),
debug: bool = False,
) -> None:
"""
Expand All @@ -67,9 +67,40 @@ def load_csv (
if debug:
ic(part)

# next, handle the output options
if save_parq is not None:
part.save_file_parquet(cloudpathlib.AnyPath(save_parq))


@APP.command("load-rdf")
def cli_load_rdf (
*,
load_rdf: str = typer.Option(..., "--file", "-f", help="input RDF file"),
rdf_format: str = typer.Option("ttl", "--format", help="RDF format: ttl, rdf, jsonld, etc."),
save_parq: str = typer.Option(None, "--save-parq", help="output as Parquet"),
save_csv: str = typer.Option(None, "--save-csv", help="output as CSV"),
debug: bool = False,
) -> None:
"""
Load an RDF file into a graph partition, optionally converting and
saving to different formats.
"""
part: Partition = Partition(
part_id = 0,
)

part.parse_rows(part.iter_load_rdf(cloudpathlib.AnyPath(load_rdf), rdf_format))

if debug:
ic(part)

# next, handle the output options
if save_parq is not None:
part.save_file_parquet(cloudpathlib.AnyPath(save_parq))

if save_csv is not None:
part.save_file_csv(cloudpathlib.AnyPath(save_csv))


if __name__ == "__main__":
APP()
110 changes: 102 additions & 8 deletions pynock/pynock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

import csv
import json
import sys
import typing

from icecream import ic # type: ignore # pylint: disable=E0401
from pydantic import BaseModel # pylint: disable=E0401,E0611
from rich.progress import track # pylint: disable=E0401
import cloudpathlib
import kglab
import pandas as pd
import pyarrow as pa # type: ignore # pylint: disable=E0401
import pyarrow.lib # type: ignore # pylint: disable=E0401
Expand Down Expand Up @@ -70,25 +72,44 @@ class Partition (BaseModel): # pylint: disable=R0903
"""
Representing a partition in the graph.
"""
PROPS_NULL: typing.ClassVar[str] = "null"

part_id: int = NOT_FOUND
next_node: int = 0
nodes: typing.Dict[int, Node] = {}
node_names: typing.Dict[str, int] = {}
edge_rels: typing.List[str] = [""]


def create_node (
def lookup_node (
self,
node_name: str,
*,
debug: bool = False, # pylint: disable=W0613
) -> typing.Optional[Node]:
"""
Lookup a node, return None if not found.
"""
if node_name in self.node_names:
return self.nodes[self.node_names[node_name]]

return None


def create_node_name (
self,
node_name: str,
*,
debug: bool = False, # pylint: disable=W0613
) -> int:
"""
Create a node, looking up first to avoid duplicates.
Create a name for a new node in the namespace, looking up first to avoid duplicates.
"""
node_id: int = NOT_FOUND

if node_name in self.node_names:
if node_name in [None, ""]:
raise ValueError(f"node name cannot be null |{ node_name }|")
elif node_name in self.node_names:
node_id = self.node_names[node_name]
else:
node_id = self.next_node
Expand All @@ -110,7 +131,7 @@ def load_props (
"""
prop_map: PropMap = {}

if props not in ("null", ""):
if props not in (cls.PROPS_NULL, ""):
prop_map = json.loads(props)

return prop_map
Expand All @@ -126,7 +147,7 @@ def save_props (
"""
Save property pairs to a JSON string.
"""
props: str = "null"
props: str = cls.PROPS_NULL

if len(prop_map) > 0:
props = json.dumps(prop_map)
Expand All @@ -145,6 +166,7 @@ def populate_node (
"""
Populate a Node object from the given Parquet row data.
"""
# create a src node
node: Node = Node(
name = row["src_name"],
truth = row["truth"],
Expand All @@ -154,8 +176,9 @@ def populate_node (
prop_map = self.load_props(row["props"]),
)

node.node_id = self.create_node_name(node.name)

# add this node to the global list
node.node_id = self.create_node(node.name)
self.nodes[node.node_id] = node

return node
Expand Down Expand Up @@ -187,10 +210,25 @@ def populate_edge (
"""
Populate an Edge object from the given Parquet row data.
"""
# first, lookup the dst node and create if needed
dst_name: str = row["dst_name"]
dst_node: typing.Optional[Node] = self.lookup_node(dst_name)

if dst_node is None:
dst_node = Node(
node_id = self.create_node_name(dst_name),
name = dst_name,
truth = row["truth"],
is_rdf = row["is_rdf"],
)

self.nodes[dst_node.node_id] = dst_node

# create the edge
edge: Edge = Edge(
rel = self.get_edge_rel(row["rel_name"], create=True),
truth = row["truth"],
node_id = self.create_node(row["dst_name"]),
node_id = dst_node.node_id,
prop_map = self.load_props(row["props"]),
)

Expand Down Expand Up @@ -265,6 +303,62 @@ def iter_load_csv (
row_num += 1


def iter_load_rdf (
self,
rdf_path: cloudpathlib.AnyPath,
rdf_format: str,
*,
debug: bool = False,
) -> typing.Iterable[typing.Tuple[int, GraphRow]]:
"""
Iterate through the rows implied by a RDF file.
"""
row_num: int = 0

kg = kglab.KnowledgeGraph()
kg.load_rdf(rdf_path, format=rdf_format)

for subj, pred, objt in kg.rdf_graph():
if debug:
ic(subj, pred, objt)

# node representation for a triple
row: GraphRow = {}
row["src_name"] = str(subj)
row["truth"] = 1.0
row["edge_id"] = NOT_FOUND
row["rel_name"] = None
row["dst_name"] = None
row["is_rdf"] = True
row["shadow"] = Node.BASED_LOCAL
row["labels"] = ""
row["props"] = self.PROPS_NULL

if debug:
ic("node", subj, row_num, row)

yield row_num, row
row_num += 1

# edge representation for a triple
row = {}
row["src_name"] = str(subj)
row["truth"] = 1.0
row["edge_id"] = 1
row["rel_name"] = str(pred)
row["dst_name"] = str(objt)
row["is_rdf"] = True
row["shadow"] = Node.BASED_LOCAL
row["labels"] = ""
row["props"] = self.PROPS_NULL

if debug:
ic("edge", objt, row_num, row)

yield row_num, row
row_num += 1


def parse_rows (
self,
iter_load: typing.Iterable[typing.Tuple[int, GraphRow]],
Expand All @@ -284,7 +378,7 @@ def parse_rows (
ic(node)

# validate the node/edge sequencing and consistency among the rows
if row["src_name"] != node.name:
elif row["src_name"] != node.name:
error_node = row["src_name"]
message = f"|{ error_node }| out of sequence at row {row_num}"
raise ValueError(message)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cloudpathlib >= 0.10
icecream >= 2.1
kglab >= 0.6
pandas >= 1.4
pydantic >= 1.10
pyarrow >= 6.0
Expand Down

0 comments on commit bb7793b

Please sign in to comment.