Skip to content

Commit

Permalink
Polish automated constructor
Browse files Browse the repository at this point in the history
TODO:
1. Add testing
2. Extend documentation
  • Loading branch information
ndaelman committed Feb 2, 2024
1 parent 83cce5c commit bbdde32
Showing 1 changed file with 87 additions and 35 deletions.
122 changes: 87 additions & 35 deletions utils/hierarchy_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from typing import List, Optional, Protocol, Tuple
from copy import deepcopy
from itertools import groupby
from typing import Any, List, Optional, Protocol, Tuple
import networkx as nx


Cluster = set[str]


class PSection(Protocol):
def get_name(self) -> str:
...
Expand All @@ -12,6 +17,9 @@ def __lt__(self, other: "PSection") -> bool:
def __gt__(self, other: "PSection") -> bool:
...

def __eq__(self, other: Any) -> bool:
...


class HierarchyFactory:
def __init__(self):
Expand Down Expand Up @@ -73,49 +81,94 @@ def add_to_path(
self.main_graph.add_edge(given_name, path_segments[-1])

# (automated) construction
def _check_branch_consistency(self, branch: str) -> bool:
def _check_graph_consistency(self, graph: nx.DiGraph) -> None:
# branches are expected to be internally ordered
# raise an error when __lt__ or __gt__ is not implemented
pass

def _get_extremity(self, branch: str, head: bool=True) -> str:
pass

def _move_along_branch(self, branch: str, up: bool=True) -> str:
# yield the next node (name) along the branch
pass

def _attach_branch(self, branch_1: str, branch_2_head: str) -> str:
# create an edge from branch_2 head to branch_1
pass

def _insert_branch(self, branch_1: str, branch_2_head: str) -> str:
# remove the edge between branch_1 and [branch_1 + dn]
# link branch_2_head to branch_1
# link branch_2_tail to [branch_1 + dn]
pass

def _merge_branch(self, branch_1: str, branch_2: str) -> str:
# compare their tail_1 / head_2 (node with no incoming/outgoing edges)
# if head_2 < tail_1, add an edge from head_2 to tail_1
# if not, repeat the process with head_2 and [tail_1 + 1 * up]
# if a hit, check whether head_2 > [tail_1 + 1 * down], decide on attaching or inserting
pass

def _automated_construct(self) -> None:
for node in graph.nodes():
try:
if graph.nodes[node]["section"] < graph.nodes[node]["section"]:
raise ValueError(
f"Node {node} does not implement the __lt__ method correctly."
)
if graph.nodes[node]["section"] > graph.nodes[node]["section"]:
raise ValueError(
f"Node {node} does not implement the __gt__ method correctly."
)
if not (graph.nodes[node]["section"] == graph.nodes[node]["section"]):
raise ValueError(
f"Node {node} does not implement the __eq__ method correctly."
)
except TypeError:
raise ValueError(f"Node {node} does not implement the comparison methods.")

for edge in graph.edges():
if not (graph.nodes[edge[0]]["section"] < graph.nodes[edge[1]]["section"]):
raise ValueError(
f"Graph is inconsistently ordered: {edge[0]} is not less than or equal {edge[1]}."
)
# Note: this does not check for comparison interoperability between different node types

# TODO: consider handler for cycles: splitting them

def _get_extremities(self, graph: nx.DiGraph, head: bool = True) -> list[str]:
if head:
return [node for node, degree in graph.out_degree() if degree == 0]
return [node for node, degree in graph.in_degree() if degree == 0]

def _move_along_branch(self, branch_node_name: str, up: bool = True) -> list[str]:
if up:
return list(self.temp_graph.predecessors(branch_node_name))
return list(self.temp_graph.successors(branch_node_name))

def _attach_branch(
self,
larger_branch_current: str,
larger_branch_prev: Optional[str],
smaller_branch_head: str,
) -> None:
"""Attach the smaller branch to the larger branch, according to the following rules:
1. both branches are left structurally intact.
2. the smaller branch head is attached at the smallest node that is still larger than itself.
The search for the attachment point is done recursively in a depth-first manner.
"""
if larger_branch_prev is not None:
if smaller_branch_head >= larger_branch_current:
self.main_graph.add_edge(smaller_branch_head, larger_branch_prev)
return

for larger_branch_next in self._move_along_branch(
larger_branch_current, up=False
):
self._attach_branch(
larger_branch_next, larger_branch_current, smaller_branch_head
)

def _sort_heads(self, heads: list[str]) -> list[list[str]]:
comparison_key = lambda head: self.temp_graph.nodes[head]["section"]
return [
list(group)
for _, group in groupby(sorted(heads, comparison_key), key=comparison_key)
]

def _automated_construct(self, graph: nx.DiGraph) -> None:
"""Automatically constructs the temporary graph into the main graph.
This procedure relies on the `PSection` comparison method `__lt__()` and `__gt__()`."""
# split the branch space into 2 parts
# stop when there is only 1 branch left
pass
sorted_heads = self._sort_heads(self._get_extremities(graph, head=True))
for smaller_branch_head, larger_branch_head in zip(
sorted_heads[:-1], sorted_heads[1:]
):
self._attach_branch(larger_branch_head, None, smaller_branch_head)

def construct(self) -> None:
"""Constructs the temporary graph into the main graph."""
# check if the temporary graph is a valid tree
# raise an error indicating missing links and/or cycles if not
# migrate the temporary graph to the main graph if so
# rely on `self._automated_construct` to do the heavy lifting
# raise an error indicating missing links and/or cycles if not
pass
self.main_graph = deepcopy(self.temp_graph)
if not nx.is_weakly_connected(self.temp_graph):
self._check_graph_consistency(self.temp_graph)
self._automated_construct(list(nx.weakly_connected_components(self.main_graph)))

# navigation
def cd(self, path: Optional[str]) -> None:
Expand All @@ -141,7 +194,6 @@ def ls(self, path: Optional[str] = None) -> List[str]:
if self._seg_to_graph(self._split_path(path)) in self.main_graph:
return list(self.main_graph.successors(path))


# searching
def _match_pattern(self, node: str, pattern_parts: List[str], index: int) -> bool:
"""Recursively checks if the node path matches the pattern."""
Expand Down

0 comments on commit bbdde32

Please sign in to comment.