Skip to content

Commit

Permalink
refactor Parquet file iterators; improved type hinting
Browse files Browse the repository at this point in the history
  • Loading branch information
ceteri committed Oct 2, 2022
1 parent 3b684f5 commit 218fbb7
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 80 deletions.
4 changes: 2 additions & 2 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def load_parquet (
part_id = 0,
)

part.load_rows_parquet(parq_file)
part.parse_rows(part.iter_load_parquet(parq_file))

if debug:
ic(part)
Expand All @@ -62,7 +62,7 @@ def load_csv (
part_id = 0,
)

part.load_rows_csv(cloudpathlib.AnyPath(load_csv))
part.parse_rows(part.iter_load_csv(cloudpathlib.AnyPath(load_csv)))

if debug:
ic(part)
Expand Down
3 changes: 2 additions & 1 deletion pynock/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
efficiently in Python.
"""

from .pynock import Partition, Node, Edge, NOT_FOUND
from .pynock import GraphRow, PropMap, NOT_FOUND, \
Node, Edge, Partition
151 changes: 75 additions & 76 deletions pynock/pynock.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
import pyarrow.parquet as pq # type: ignore # pylint: disable=E0401


######################################################################
## non-class definitions

GraphRow = typing.Dict[str, typing.Any]
PropMap = typing.Dict[str, typing.Any]

NOT_FOUND: int = -1


Expand All @@ -38,7 +44,7 @@ class Node (BaseModel): # pylint: disable=R0903
is_rdf: bool = True
label_set: typing.Set[str] = set()
truth: float = 1.0
prop_map: typing.Dict[str, str] = {}
prop_map: PropMap = {}
edge_map: typing.Dict[int, list] = {}


Expand All @@ -54,7 +60,7 @@ class Edge (BaseModel): # pylint: disable=R0903
rel: int = BLANK_RELATION
node_id: int = NOT_FOUND
truth: float = 1.0
prop_map: typing.Dict[str, str] = {}
prop_map: PropMap = {}


######################################################################
Expand All @@ -68,7 +74,7 @@ class Partition (BaseModel): # pylint: disable=R0903
next_node: int = 0
nodes: typing.Dict[int, Node] = {}
node_names: typing.Dict[str, int] = {}
edge_rels: typing.List[str] = [ "" ]
edge_rels: typing.List[str] = [""]


def create_node (
Expand Down Expand Up @@ -98,11 +104,11 @@ def load_props (
props: str,
*,
debug: bool = False, # pylint: disable=W0613
) -> typing.Dict[str, str]:
) -> PropMap:
"""
Load property pairs from a JSON string.
"""
prop_map: typing.Dict[str, str] = {}
prop_map: PropMap = {}

if props not in ("null", ""):
prop_map = json.loads(props)
Expand All @@ -113,7 +119,7 @@ def load_props (
@classmethod
def save_props (
cls,
prop_map: typing.Dict[str, str],
prop_map: PropMap,
*,
debug: bool = False, # pylint: disable=W0613
) -> str:
Expand All @@ -132,7 +138,7 @@ def save_props (

def populate_node (
self,
row: dict,
row: GraphRow,
*,
debug: bool = False, # pylint: disable=W0613
) -> Node:
Expand Down Expand Up @@ -173,7 +179,7 @@ def get_edge_rel (

def populate_edge (
self,
row: dict,
row: GraphRow,
node: Node,
*,
debug: bool = False, # pylint: disable=W0613
Expand All @@ -198,109 +204,102 @@ def populate_edge (


@classmethod
def iter_row_group (
def iter_load_parquet (
cls,
row_group: pyarrow.lib.Table, # pylint: disable=I1101
*,
debug: bool = False, # pylint: disable=W0613
) -> typing.Iterable:
"""
Iterate through the rows in a Parquet row group.
"""
for r_idx in range(row_group.num_rows):
row: dict = {}

for c_idx in range(row_group.num_columns):
try:
key: str = row_group.column_names[c_idx]
col: pyarrow.lib.ChunkedArray = row_group.column(c_idx) # pylint: disable=I1101
val: typing.Any = col[r_idx]
row[key] = val.as_py()
except IndexError as ex:
ic(ex, r_idx, c_idx)
return

if debug:
print()
ic(r_idx, row)

yield row


def load_rows_parquet (
self,
parq_file: pq.ParquetFile,
*,
debug: bool = False,
) -> None:
) -> typing.Iterable[typing.Tuple[int, GraphRow]]:
"""
Load a Parquet file and parse it rows into a graph partition.
Iterate through the rows in a Parquet file.
"""
for i in range(parq_file.num_row_groups):
row_group: pyarrow.lib.Table = parq_file.read_row_group(i) # pylint: disable=I1101
row_num: int = 0

for row in track(self.iter_row_group(row_group), description=f"row group {i}"):
# have we reached a row which begins a new node?
if row["edge_id"] < 0:
node = self.populate_node(row)
for batch in range(parq_file.num_row_groups):
row_group: pyarrow.lib.Table = parq_file.read_row_group(batch) # pylint: disable=I1101

if debug:
print()
ic(node)
for r_idx in range(row_group.num_rows):
row: GraphRow = {}

# otherwise this row is an edge for the most recent node
else:
assert row["src_name"] == node.name
# 'edge_id': 2,
for c_idx in range(row_group.num_columns):
try:
key: str = row_group.column_names[c_idx]
col: pyarrow.lib.ChunkedArray = row_group.column(c_idx) # pylint: disable=I1101
val: typing.Any = col[r_idx]
row[key] = val.as_py()
except IndexError as ex:
ic(ex, r_idx, c_idx)
return

edge = self.populate_edge(row, node)
if debug:
print()
ic(r_idx, row)

if debug:
ic(edge)
yield row_num, row
row_num += 1


def load_rows_csv (
def iter_load_csv (
self,
csv_path: cloudpathlib.AnyPath,
*,
debug: bool = False,
) -> None:
) -> typing.Iterable[typing.Tuple[int, GraphRow]]:
"""
Load a CSV file and parse it rows into a graph partition.
Iterate through the rows in a CSV file.
"""
row_num: int = 0

with open(csv_path) as fp:
reader = csv.reader(fp, delimiter=",")
header = next(reader)

for row_val in reader:
row: typing.Dict[str, typing.Any] = dict(zip(header, row_val))
row: GraphRow = dict(zip(header, row_val))
row["edge_id"] = int(row["edge_id"])
row["is_rdf"] = bool(row["is_rdf"])
row["shadow"] = int(row["shadow"])
row["truth"] = float(row["truth"])

# have we reached a row which begins a new node?
if row["edge_id"] < 0:
node = self.populate_node(row)
yield row_num, row
row_num += 1


def parse_rows (
self,
iter_load: typing.Iterable[typing.Tuple[int, GraphRow]],
*,
debug: bool = False,
) -> None:
"""
Parse a stream of rows to construct a graph partition.
"""
for row_num, row in track(iter_load, description=f"parse rows"):
# have we reached a row which begins a new node?
if row["edge_id"] < 0:
node: Node = self.populate_node(row)

if debug:
print()
ic(node)
if debug:
print()
ic(node)

# otherwise this row is an edge for the most recent node
else:
assert row["src_name"] == node.name
# 'edge_id': 2,
# validate the node/edge sequencing and consistency among the rows
if row["src_name"] != node.name:
error_node = row["src_name"]
message = f"|{ error_node }| out of sequence at row {row_num}"
raise ValueError(message)

edge = self.populate_edge(row, node)
# otherwise this row is an edge for the most recent node
else:
edge: Edge = self.populate_edge(row, node)

if debug:
ic(edge)
if debug:
ic(edge)


def iter_gen_rows (
self,
) -> typing.Iterable:
) -> typing.Iterable[GraphRow]:
"""
Iterator for generating rows on writes.
"""
Expand Down Expand Up @@ -341,7 +340,7 @@ def save_file_parquet (
"""
Save a partition to a Parquet file.
"""
df = pd.DataFrame([ row for row in self.iter_gen_rows() ])
df = pd.DataFrame([row for row in self.iter_gen_rows()])
table = pa.Table.from_pandas(df)
writer = pq.ParquetWriter(save_parq.as_posix(), table.schema)
writer.write_table(table)
Expand All @@ -357,5 +356,5 @@ def save_file_csv (
"""
Save a partition to a CSV file.
"""
df = pd.DataFrame([ row for row in self.iter_gen_rows() ])
df = pd.DataFrame([row for row in self.iter_gen_rows()])
df.to_csv(save_csv.as_posix(), index=False)
2 changes: 1 addition & 1 deletion tests/test_csv_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_save_file_csv ():
part_id = 0,
)

part.load_rows_parquet(parq_file)
part.parse_rows(part.iter_load_parquet(parq_file))

# write the partition as a CSV file
part.save_file_csv(cloudpathlib.AnyPath(tmp_obs.name))
Expand Down

0 comments on commit 218fbb7

Please sign in to comment.