diff --git a/utils/hierarchy_handler.py b/utils/hierarchy_handler.py index ffef42b7..f468fe5b 100644 --- a/utils/hierarchy_handler.py +++ b/utils/hierarchy_handler.py @@ -1,7 +1,12 @@ -from typing import List, Optional, Protocol, Tuple +from copy import deepcopy +from itertools import groupby +from typing import Any, List, Optional, Protocol, Tuple import networkx as nx +Cluster = set[str] + + class PSection(Protocol): def get_name(self) -> str: ... @@ -12,6 +17,9 @@ def __lt__(self, other: "PSection") -> bool: def __gt__(self, other: "PSection") -> bool: ... + def __eq__(self, other: Any) -> bool: + ... + class HierarchyFactory: def __init__(self): @@ -73,49 +81,94 @@ def add_to_path( self.main_graph.add_edge(given_name, path_segments[-1]) # (automated) construction - def _check_branch_consistency(self, branch: str) -> bool: + def _check_graph_consistency(self, graph: nx.DiGraph) -> None: # branches are expected to be internally ordered # raise an error when __lt__ or __gt__ is not implemented - pass - - def _get_extremity(self, branch: str, head: bool=True) -> str: - pass - - def _move_along_branch(self, branch: str, up: bool=True) -> str: - # yield the next node (name) along the branch - pass - - def _attach_branch(self, branch_1: str, branch_2_head: str) -> str: - # create an edge from branch_2 head to branch_1 - pass - - def _insert_branch(self, branch_1: str, branch_2_head: str) -> str: - # remove the edge between branch_1 and [branch_1 + dn] - # link branch_2_head to branch_1 - # link branch_2_tail to [branch_1 + dn] - pass - - def _merge_branch(self, branch_1: str, branch_2: str) -> str: - # compare their tail_1 / head_2 (node with no incoming/outgoing edges) - # if head_2 < tail_1, add an edge from head_2 to tail_1 - # if not, repeat the process with head_2 and [tail_1 + 1 * up] - # if a hit, check whether head_2 > [tail_1 + 1 * down], decide on attaching or inserting - pass - - def _automated_construct(self) -> None: + for node in graph.nodes(): + try: + if graph.nodes[node]["section"] < graph.nodes[node]["section"]: + raise ValueError( + f"Node {node} does not implement the __lt__ method correctly." + ) + if graph.nodes[node]["section"] > graph.nodes[node]["section"]: + raise ValueError( + f"Node {node} does not implement the __gt__ method correctly." + ) + if not (graph.nodes[node]["section"] == graph.nodes[node]["section"]): + raise ValueError( + f"Node {node} does not implement the __eq__ method correctly." + ) + except TypeError: + raise ValueError(f"Node {node} does not implement the comparison methods.") + + for edge in graph.edges(): + if not (graph.nodes[edge[0]]["section"] < graph.nodes[edge[1]]["section"]): + raise ValueError( + f"Graph is inconsistently ordered: {edge[0]} is not less than or equal {edge[1]}." + ) + # Note: this does not check for comparison interoperability between different node types + + # TODO: consider handler for cycles: splitting them + + def _get_extremities(self, graph: nx.DiGraph, head: bool = True) -> list[str]: + if head: + return [node for node, degree in graph.out_degree() if degree == 0] + return [node for node, degree in graph.in_degree() if degree == 0] + + def _move_along_branch(self, branch_node_name: str, up: bool = True) -> list[str]: + if up: + return list(self.temp_graph.predecessors(branch_node_name)) + return list(self.temp_graph.successors(branch_node_name)) + + def _attach_branch( + self, + larger_branch_current: str, + larger_branch_prev: Optional[str], + smaller_branch_head: str, + ) -> None: + """Attach the smaller branch to the larger branch, according to the following rules: + 1. both branches are left structurally intact. + 2. the smaller branch head is attached at the smallest node that is still larger than itself. + The search for the attachment point is done recursively in a depth-first manner. + """ + if larger_branch_prev is not None: + if smaller_branch_head >= larger_branch_current: + self.main_graph.add_edge(smaller_branch_head, larger_branch_prev) + return + + for larger_branch_next in self._move_along_branch( + larger_branch_current, up=False + ): + self._attach_branch( + larger_branch_next, larger_branch_current, smaller_branch_head + ) + + def _sort_heads(self, heads: list[str]) -> list[list[str]]: + comparison_key = lambda head: self.temp_graph.nodes[head]["section"] + return [ + list(group) + for _, group in groupby(sorted(heads, comparison_key), key=comparison_key) + ] + + def _automated_construct(self, graph: nx.DiGraph) -> None: """Automatically constructs the temporary graph into the main graph. This procedure relies on the `PSection` comparison method `__lt__()` and `__gt__()`.""" - # split the branch space into 2 parts - # stop when there is only 1 branch left - pass + sorted_heads = self._sort_heads(self._get_extremities(graph, head=True)) + for smaller_branch_head, larger_branch_head in zip( + sorted_heads[:-1], sorted_heads[1:] + ): + self._attach_branch(larger_branch_head, None, smaller_branch_head) def construct(self) -> None: """Constructs the temporary graph into the main graph.""" # check if the temporary graph is a valid tree + # raise an error indicating missing links and/or cycles if not # migrate the temporary graph to the main graph if so # rely on `self._automated_construct` to do the heavy lifting - # raise an error indicating missing links and/or cycles if not - pass + self.main_graph = deepcopy(self.temp_graph) + if not nx.is_weakly_connected(self.temp_graph): + self._check_graph_consistency(self.temp_graph) + self._automated_construct(list(nx.weakly_connected_components(self.main_graph))) # navigation def cd(self, path: Optional[str]) -> None: @@ -141,7 +194,6 @@ def ls(self, path: Optional[str] = None) -> List[str]: if self._seg_to_graph(self._split_path(path)) in self.main_graph: return list(self.main_graph.successors(path)) - # searching def _match_pattern(self, node: str, pattern_parts: List[str], index: int) -> bool: """Recursively checks if the node path matches the pattern."""