-
Notifications
You must be signed in to change notification settings - Fork 1
/
pathcalculator.py
343 lines (238 loc) · 10.3 KB
/
pathcalculator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
"""
CSC111 Project: pathcalculator.py
Module Description
==================
Module which contains the methods and classes related to calculating the shortest path. We use the
heapq module in the implementation of our functions. The main algorithm used for calculating the
shortest path is Dijkstra's Algorithm.
Copyright and Usage Information
===============================
This file is Copyright (c) 2021 Aryaman Modi, Craig Katsube, Garv Sood, Kaartik Issar
"""
from __future__ import annotations
from abc import ABC
from typing import Any
from collections.abc import Iterable, Iterator
import heapq
from graph import Graph
#############################################################################
# PUBLIC INTERFACE
#############################################################################
def get_shortest_path_map(g: Graph,
start: Any, end: Any, points: list) -> dict[Any, dict[Any, Path]]:
"""Return the mapping of relevant shortest paths between points from start to end
Output format:
- dict[a][b] == Shortest path from a to b
- dict[b][a] == Shortest path from b to a
All internal points are mapped to all other internal points
- dict[a][b] for any unique pair a and b in points
The start key is mapped to all internal points (one direction)
- dict[start][a] for any a in points
All internal points are mapped to the end point (one direction)
- dict[a][end] for any a in points
"""
shortest_map = {start: {},
end: {}}
for point in points:
shortest_map[point] = {}
for point in points:
for other in points:
if other != point:
shortest_path = _dijkstra(g, point, other)
reverse = shortest_path.get_reversed()
shortest_map[point][other] = shortest_path
shortest_map[other][point] = reverse
from_start = _dijkstra(g, start, point)
shortest_map[start][point] = from_start
to_end = _dijkstra(g, point, end)
shortest_map[point][end] = to_end
return shortest_map
def convert_shortest_map_to_graph(shortest_map: dict[Any, dict[Any, Path]]) -> Graph:
"""Return the graph containing information about the shortest distance between points
using the edges contained within the shortest_map
"""
shortest_graph = Graph()
points = _get_all_points(shortest_map)
for point in points:
shortest_graph.add_vertex(point, '0', '0')
for point, end_points in shortest_map.items():
for end_point, path in end_points.items():
shortest_graph.add_edge(point, end_point, 1, path.get_path_weight())
return shortest_graph
def get_shortest_map_and_graph(g: Graph, start: Any, end: Any, points: list) ->\
tuple[dict[Any, dict[Any, Path]], Graph]:
"""Return both the shortest map and graph according to get_shortest_map and _graph
"""
shortest_map = get_shortest_path_map(g, start, end, points)
shortest_graph = convert_shortest_map_to_graph(shortest_map)
return shortest_map, shortest_graph
def get_shortest_graph(g: Graph, start: Any, end: Any, points: list) -> Graph:
"""Return a graph containing information about the shortest distance between points
The subgraph containing the sub-points is a complete graph
The start and end points are both connected to all internal points, but not to each other
"""
shortest_map = get_shortest_path_map(g, start, end, points)
return convert_shortest_map_to_graph(shortest_map)
def dijkstra(g: Graph, start: Any, end: Any) -> Path:
"""Return the Path containing the smallest cumulative weight from point a to b"""
return _dijkstra(g, start, end)
class Path(Iterable):
"""Interface for getting information about a path"""
def __len__(self) -> int:
raise NotImplementedError
def get_weight(self) -> float:
"""Return the weight of this Path"""
raise NotImplementedError
def get_path_weight(self) -> float:
"""Return the cumulative weight along the path this Path is a part of"""
raise NotImplementedError
def get_item(self) -> Any:
"""Return the item associated with this Path"""
raise NotImplementedError
def get_reversed(self) -> _Node:
"""Return the Path in reverse order from self"""
raise NotImplementedError
def __iter__(self) -> _PathIterator[Any]:
"""Return an iterator that can traverse the path"""
raise NotImplementedError
def __lt__(self, other: Path) -> bool:
"""Return whether this Path has a smaller weight than the other Path"""
return self.get_path_weight() < other.get_path_weight()
#############################################################################
# PRIVATE INTERFACE
#############################################################################
def _dijkstra(g: Graph, start: Any, end: Any) -> _Node:
"""Return the _Node containing the smallest cumulative weight from point a to b"""
# starts at the end because the path is built in reverse order
item_end = end
p_end = _PathNode(item_end, 0)
visited_vertices = {item_end: p_end}
heap = [p_end]
def recursive_dijkstra() -> _Node:
"""Recursively calls the dijkstra alg on the globally accessible heap"""
heap_is_not_empty = (heap != [])
if heap_is_not_empty:
p = heapq.heappop(heap)
v = p.get_item()
visited_vertices[v] = p
end_of_path = (v == start)
if end_of_path:
return p
for neighbour_vertex in g.get_neighbours(v):
u = neighbour_vertex.item
if u not in visited_vertices:
weight = g.get_weight(u, v)
next_path = _PathNode(u, weight, p)
heapq.heappush(heap, next_path)
return recursive_dijkstra()
else:
return _NullPathNode()
return recursive_dijkstra()
def _get_all_points(shortest_map: dict[Any, dict[Any, Path]]) -> set[Any]:
key_set = set(shortest_map)
one_value_set = set(list(shortest_map.values())[0])
# the key_set contains all points except the end point.
# All value sets contain the end point (so only one is needed to get it)
return set.union(key_set, one_value_set)
class _Node(Path, ABC):
"""Abstract Iterable class for hiding the interface for manually iterating through the path"""
def get_next(self) -> _Node:
"""Return the _Node preceding this _Node in the path"""
raise NotImplementedError
def __iter__(self) -> _PathIterator:
return _ItemIterator(self)
class _NullPathNode(_Node):
"""Null object pattern to represent the end of a path"""
def get_item(self) -> Any:
"""Return None since this _Node has no item"""
return None
def get_weight(self) -> float:
"""There is no parent, hence no weight, so return 0"""
return 0
def get_path_weight(self) -> float:
"""There is no path, hence no weight, so return 0"""
return 0
def get_reversed(self) -> _Node:
"""Return a copy of self since no reverse exists"""
return _NullPathNode()
def get_next(self) -> _Node:
"""Raise a stop iteration error because no next exists"""
raise StopIteration
def __len__(self) -> int:
return 0
class _PathNode(_Node):
"""Path node for containing information about a path
"""
# Private Instance Attributes:
# - item: the value of this node
# - next: the node in the path prior to this one
# - _path_weight: the cumulative weight along the entire path
# - _weight: the weight between this node and its self.next
# - _size: the size of this path
_item: Any
_next: _PathNode
_path_weight: float
_weight: float
_size: int
def __init__(self, item: Any, weight: float, parent: _PathNode = _NullPathNode()) -> None:
self._next = parent
self._item = item
self._size = len(parent) + 1
self._weight = weight
self._path_weight = parent.get_path_weight() + weight
def __len__(self) -> int:
return self._size
def get_item(self) -> Any:
"""Return the item associated with this _PathNode"""
return self._item
def get_reversed(self) -> _Node:
"""Return the _PathNode at the end of an entirely reversed chain"""
parent = _NullPathNode()
for node in _NodeIterator(self):
parent = _PathNode(node.get_item(), node.get_weight(), parent)
return parent
def get_next(self) -> _Node:
"""Return the _Node preceding this one in the path"""
return self._next
def get_path_weight(self) -> float:
"""Return the cumulative weight of this path"""
return self._path_weight
def get_weight(self) -> float:
"""Return the weight of this path node to its parent"""
return self._weight
class _PathIterator(Iterator[Any]):
"""Iterator pattern class for iterating through a Path"""
def __next__(self) -> Any:
raise NotImplementedError
class _ItemIterator(_PathIterator[Any]):
"""Iterator pattern class for iterating through the items in a path formed by _PathNodes
Iterates through the _PathNode .next chain in order and returns the item of each _Node.
"""
_wrapped_iterator: _NodeIterator
def __init__(self, initial_node: _Node) -> None:
self._wrapped_iterator = _NodeIterator(initial_node)
def __next__(self) -> Any:
node = self._wrapped_iterator.__next__()
return node.get_item()
class _NodeIterator(_PathIterator[_Node]):
"""Iterator pattern class for iterating through the _Node chain
Returns the _Node object at each iteration
"""
_current_node: _Node
def __init__(self, initial_node: _Node) -> None:
self._current_node = initial_node
def __next__(self) -> _Node:
cur = self._current_node
self._current_node = self._current_node.get_next()
return cur
if __name__ == '__main__':
import doctest
doctest.testmod()
import python_ta
python_ta.check_all(config={
'max-line-length': 100,
'disable': ['E1136', 'E9971'],
'extra-imports': ['heapq', 'graph', 'abc'],
'allowed-io': [],
'max-nested-blocks': 5
})