diff --git a/example.py b/example.py index 96ece0d..0ca041a 100644 --- a/example.py +++ b/example.py @@ -37,7 +37,7 @@ def load_parquet ( part_id = 0, ) - part.load_rows_parquet(parq_file) + part.parse_rows(part.iter_load_parquet(parq_file)) if debug: ic(part) @@ -62,7 +62,7 @@ def load_csv ( part_id = 0, ) - part.load_rows_csv(cloudpathlib.AnyPath(load_csv)) + part.parse_rows(part.iter_load_csv(cloudpathlib.AnyPath(load_csv))) if debug: ic(part) diff --git a/pynock/__init__.py b/pynock/__init__.py index 8b472d0..3d799b1 100644 --- a/pynock/__init__.py +++ b/pynock/__init__.py @@ -6,4 +6,5 @@ efficiently in Python. """ -from .pynock import Partition, Node, Edge, NOT_FOUND +from .pynock import GraphRow, PropMap, NOT_FOUND, \ + Node, Edge, Partition diff --git a/pynock/pynock.py b/pynock/pynock.py index 698ab56..d45d1cb 100644 --- a/pynock/pynock.py +++ b/pynock/pynock.py @@ -20,6 +20,12 @@ import pyarrow.parquet as pq # type: ignore # pylint: disable=E0401 +###################################################################### +## non-class definitions + +GraphRow = typing.Dict[str, typing.Any] +PropMap = typing.Dict[str, typing.Any] + NOT_FOUND: int = -1 @@ -38,7 +44,7 @@ class Node (BaseModel): # pylint: disable=R0903 is_rdf: bool = True label_set: typing.Set[str] = set() truth: float = 1.0 - prop_map: typing.Dict[str, str] = {} + prop_map: PropMap = {} edge_map: typing.Dict[int, list] = {} @@ -54,7 +60,7 @@ class Edge (BaseModel): # pylint: disable=R0903 rel: int = BLANK_RELATION node_id: int = NOT_FOUND truth: float = 1.0 - prop_map: typing.Dict[str, str] = {} + prop_map: PropMap = {} ###################################################################### @@ -68,7 +74,7 @@ class Partition (BaseModel): # pylint: disable=R0903 next_node: int = 0 nodes: typing.Dict[int, Node] = {} node_names: typing.Dict[str, int] = {} - edge_rels: typing.List[str] = [ "" ] + edge_rels: typing.List[str] = [""] def create_node ( @@ -98,11 +104,11 @@ def load_props ( props: str, *, debug: bool = False, # pylint: disable=W0613 - ) -> typing.Dict[str, str]: + ) -> PropMap: """ Load property pairs from a JSON string. """ - prop_map: typing.Dict[str, str] = {} + prop_map: PropMap = {} if props not in ("null", ""): prop_map = json.loads(props) @@ -113,7 +119,7 @@ def load_props ( @classmethod def save_props ( cls, - prop_map: typing.Dict[str, str], + prop_map: PropMap, *, debug: bool = False, # pylint: disable=W0613 ) -> str: @@ -132,7 +138,7 @@ def save_props ( def populate_node ( self, - row: dict, + row: GraphRow, *, debug: bool = False, # pylint: disable=W0613 ) -> Node: @@ -173,7 +179,7 @@ def get_edge_rel ( def populate_edge ( self, - row: dict, + row: GraphRow, node: Node, *, debug: bool = False, # pylint: disable=W0613 @@ -198,109 +204,102 @@ def populate_edge ( @classmethod - def iter_row_group ( + def iter_load_parquet ( cls, - row_group: pyarrow.lib.Table, # pylint: disable=I1101 - *, - debug: bool = False, # pylint: disable=W0613 - ) -> typing.Iterable: - """ -Iterate through the rows in a Parquet row group. - """ - for r_idx in range(row_group.num_rows): - row: dict = {} - - for c_idx in range(row_group.num_columns): - try: - key: str = row_group.column_names[c_idx] - col: pyarrow.lib.ChunkedArray = row_group.column(c_idx) # pylint: disable=I1101 - val: typing.Any = col[r_idx] - row[key] = val.as_py() - except IndexError as ex: - ic(ex, r_idx, c_idx) - return - - if debug: - print() - ic(r_idx, row) - - yield row - - - def load_rows_parquet ( - self, parq_file: pq.ParquetFile, *, debug: bool = False, - ) -> None: + ) -> typing.Iterable[typing.Tuple[int, GraphRow]]: """ -Load a Parquet file and parse it rows into a graph partition. +Iterate through the rows in a Parquet file. """ - for i in range(parq_file.num_row_groups): - row_group: pyarrow.lib.Table = parq_file.read_row_group(i) # pylint: disable=I1101 + row_num: int = 0 - for row in track(self.iter_row_group(row_group), description=f"row group {i}"): - # have we reached a row which begins a new node? - if row["edge_id"] < 0: - node = self.populate_node(row) + for batch in range(parq_file.num_row_groups): + row_group: pyarrow.lib.Table = parq_file.read_row_group(batch) # pylint: disable=I1101 - if debug: - print() - ic(node) + for r_idx in range(row_group.num_rows): + row: GraphRow = {} - # otherwise this row is an edge for the most recent node - else: - assert row["src_name"] == node.name - # 'edge_id': 2, + for c_idx in range(row_group.num_columns): + try: + key: str = row_group.column_names[c_idx] + col: pyarrow.lib.ChunkedArray = row_group.column(c_idx) # pylint: disable=I1101 + val: typing.Any = col[r_idx] + row[key] = val.as_py() + except IndexError as ex: + ic(ex, r_idx, c_idx) + return - edge = self.populate_edge(row, node) + if debug: + print() + ic(r_idx, row) - if debug: - ic(edge) + yield row_num, row + row_num += 1 - def load_rows_csv ( + def iter_load_csv ( self, csv_path: cloudpathlib.AnyPath, *, debug: bool = False, - ) -> None: + ) -> typing.Iterable[typing.Tuple[int, GraphRow]]: """ -Load a CSV file and parse it rows into a graph partition. +Iterate through the rows in a CSV file. """ + row_num: int = 0 + with open(csv_path) as fp: reader = csv.reader(fp, delimiter=",") header = next(reader) for row_val in reader: - row: typing.Dict[str, typing.Any] = dict(zip(header, row_val)) + row: GraphRow = dict(zip(header, row_val)) row["edge_id"] = int(row["edge_id"]) row["is_rdf"] = bool(row["is_rdf"]) row["shadow"] = int(row["shadow"]) row["truth"] = float(row["truth"]) - # have we reached a row which begins a new node? - if row["edge_id"] < 0: - node = self.populate_node(row) + yield row_num, row + row_num += 1 + + + def parse_rows ( + self, + iter_load: typing.Iterable[typing.Tuple[int, GraphRow]], + *, + debug: bool = False, + ) -> None: + """ +Parse a stream of rows to construct a graph partition. + """ + for row_num, row in track(iter_load, description=f"parse rows"): + # have we reached a row which begins a new node? + if row["edge_id"] < 0: + node: Node = self.populate_node(row) - if debug: - print() - ic(node) + if debug: + print() + ic(node) - # otherwise this row is an edge for the most recent node - else: - assert row["src_name"] == node.name - # 'edge_id': 2, + # validate the node/edge sequencing and consistency among the rows + if row["src_name"] != node.name: + error_node = row["src_name"] + message = f"|{ error_node }| out of sequence at row {row_num}" + raise ValueError(message) - edge = self.populate_edge(row, node) + # otherwise this row is an edge for the most recent node + else: + edge: Edge = self.populate_edge(row, node) - if debug: - ic(edge) + if debug: + ic(edge) def iter_gen_rows ( self, - ) -> typing.Iterable: + ) -> typing.Iterable[GraphRow]: """ Iterator for generating rows on writes. """ @@ -341,7 +340,7 @@ def save_file_parquet ( """ Save a partition to a Parquet file. """ - df = pd.DataFrame([ row for row in self.iter_gen_rows() ]) + df = pd.DataFrame([row for row in self.iter_gen_rows()]) table = pa.Table.from_pandas(df) writer = pq.ParquetWriter(save_parq.as_posix(), table.schema) writer.write_table(table) @@ -357,5 +356,5 @@ def save_file_csv ( """ Save a partition to a CSV file. """ - df = pd.DataFrame([ row for row in self.iter_gen_rows() ]) + df = pd.DataFrame([row for row in self.iter_gen_rows()]) df.to_csv(save_csv.as_posix(), index=False) diff --git a/tests/test_csv_file.py b/tests/test_csv_file.py index a78e8ce..419a1a2 100644 --- a/tests/test_csv_file.py +++ b/tests/test_csv_file.py @@ -38,7 +38,7 @@ def test_save_file_csv (): part_id = 0, ) - part.load_rows_parquet(parq_file) + part.parse_rows(part.iter_load_parquet(parq_file)) # write the partition as a CSV file part.save_file_csv(cloudpathlib.AnyPath(tmp_obs.name))