Skip to content

Commit

Permalink
load from CSV
Browse files Browse the repository at this point in the history
  • Loading branch information
ceteri committed Oct 2, 2022
1 parent ca5d7ef commit 3b684f5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
40 changes: 26 additions & 14 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,24 @@ def load_parquet (
*,
load_parq: str = typer.Option(..., "--file", "-f", help="input Parquet file"),
save_csv: str = typer.Option(None, "--save-csv", help="output as CSV"),
save_parq: str = typer.Option(None, "--save-parq", help="output Parquet"),
save_ttl: str = typer.Option(None, "--save-ttl", help="output TTL"),
debug: bool = False,
) -> None:
"""
Load a Parquet file into a graph partition.
Load a Parquet file into a graph partition, optionally converting and
saving to different formats.
"""
pq_file: pq.ParquetFile = pq.ParquetFile(load_parq)
parq_file: pq.ParquetFile = pq.ParquetFile(load_parq)

if debug:
ic(pq_file.metadata)
ic(pq_file.schema)
ic(type(pq_file.schema))
ic(parq_file.metadata)
ic(parq_file.schema)
ic(type(parq_file.schema))

part: Partition = Partition(
part_id = 0,
)

part.load_rows(pq_file)
part.load_rows_parquet(parq_file)

if debug:
ic(part)
Expand All @@ -47,16 +46,29 @@ def load_parquet (
if save_csv is not None:
part.save_file_csv(cloudpathlib.AnyPath(save_csv))

if save_parq is not None:
part.save_file_parquet(cloudpathlib.AnyPath(save_parq))


@APP.command()
def load_csv ():
def load_csv (
*,
load_csv: str = typer.Option(..., "--file", "-f", help="input CSV file"),
save_parq: str = typer.Option(None, "--save-parq", help="output Parquet"),
debug: bool = False,
) -> None:
"""
Wherein we do stuff with CSV files.
Load a CSV file into a graph partition, optionally converting and
saving to different formats.
"""
pass
part: Partition = Partition(
part_id = 0,
)

part.load_rows_csv(cloudpathlib.AnyPath(load_csv))

if debug:
ic(part)

if save_parq is not None:
part.save_file_parquet(cloudpathlib.AnyPath(save_parq))


if __name__ == "__main__":
Expand Down
50 changes: 45 additions & 5 deletions pynock/pynock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
efficiently in Python.
"""

import csv
import json
import typing

Expand Down Expand Up @@ -226,17 +227,17 @@ def iter_row_group (
yield row


def load_rows (
def load_rows_parquet (
self,
pq_file: pq.ParquetFile,
parq_file: pq.ParquetFile,
*,
debug: bool = False,
) -> None:
"""
Load and parse all of the Parquet rows into a graph partition.
Load a Parquet file and parse it rows into a graph partition.
"""
for i in range(pq_file.num_row_groups):
row_group: pyarrow.lib.Table = pq_file.read_row_group(i) # pylint: disable=I1101
for i in range(parq_file.num_row_groups):
row_group: pyarrow.lib.Table = parq_file.read_row_group(i) # pylint: disable=I1101

for row in track(self.iter_row_group(row_group), description=f"row group {i}"):
# have we reached a row which begins a new node?
Expand All @@ -258,6 +259,45 @@ def load_rows (
ic(edge)


def load_rows_csv (
self,
csv_path: cloudpathlib.AnyPath,
*,
debug: bool = False,
) -> None:
"""
Load a CSV file and parse it rows into a graph partition.
"""
with open(csv_path) as fp:
reader = csv.reader(fp, delimiter=",")
header = next(reader)

for row_val in reader:
row: typing.Dict[str, typing.Any] = dict(zip(header, row_val))
row["edge_id"] = int(row["edge_id"])
row["is_rdf"] = bool(row["is_rdf"])
row["shadow"] = int(row["shadow"])
row["truth"] = float(row["truth"])

# have we reached a row which begins a new node?
if row["edge_id"] < 0:
node = self.populate_node(row)

if debug:
print()
ic(node)

# otherwise this row is an edge for the most recent node
else:
assert row["src_name"] == node.name
# 'edge_id': 2,

edge = self.populate_edge(row, node)

if debug:
ic(edge)


def iter_gen_rows (
self,
) -> typing.Iterable:
Expand Down
6 changes: 3 additions & 3 deletions tests/test_csv_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ def test_save_file_csv ():

try:
load_parq: str = "dat/recipes.parq"
pq_file: pq.ParquetFile = pq.ParquetFile(load_parq)
parq_file: pq.ParquetFile = pq.ParquetFile(load_parq)

# leverage Arrow to convert the "exp" baseline
for batch in pq_file.iter_batches():
for batch in parq_file.iter_batches():
df = batch.to_pandas()
df.to_csv(tmp_exp.name, index=False)
break
Expand All @@ -38,7 +38,7 @@ def test_save_file_csv ():
part_id = 0,
)

part.load_rows(pq_file)
part.load_rows_parquet(parq_file)

# write the partition as a CSV file
part.save_file_csv(cloudpathlib.AnyPath(tmp_obs.name))
Expand Down

0 comments on commit 3b684f5

Please sign in to comment.