-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsearch.py
431 lines (329 loc) · 15.4 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
"""
Search algorithms and problems.
This code is taken from https://github.com/aimacode/aima-python repository,
official repository of the AIMA book. In particular, it is a subset of
https://github.com/aimacode/aima-python/blob/master/search4e.ipynb notebook.
"""
import sys
import heapq
import math
from collections import defaultdict, deque, Counter
class Problem(object):
"""The abstract class for a formal problem. A new domain subclasses this,
overriding `actions` and `results`, and perhaps other methods.
The default heuristic is 0 and the default action cost is 1 for all states.
When yiou create an instance of a subclass, specify `initial`, and `goal` states
(or give an `is_goal` method) and perhaps other keyword args for the subclass."""
def __init__(self, initial=None, goal=None, **kwds):
self.__dict__.update(initial=initial, goal=goal, **kwds)
def actions(self, state): raise NotImplementedError
def result(self, state, action): raise NotImplementedError
def is_goal(self, state): return state == self.goal
def action_cost(self, s, a, s1): return 1
def h(self, node): return 0
def __str__(self):
return '{}({!r}, {!r})'.format(
type(self).__name__, self.initial, self.goal)
class Node:
"A Node in a search tree."
def __init__(self, state, parent=None, action=None, path_cost=0):
self.__dict__.update(state=state, parent=parent, action=action, path_cost=path_cost)
def __repr__(self): return '<{}>'.format(self.state)
def __len__(self): return 0 if self.parent is None else (1 + len(self.parent))
def __lt__(self, other): return self.path_cost < other.path_cost
failure = Node('failure', path_cost=math.inf) # Indicates an algorithm couldn't find a solution.
cutoff = Node('cutoff', path_cost=math.inf) # Indicates iterative deepening search was cut off.
def expand(problem, node):
"Expand a node, generating the children nodes."
s = node.state
for action in problem.actions(s):
s1 = problem.result(s, action)
cost = node.path_cost + problem.action_cost(s, action, s1)
yield Node(s1, node, action, cost)
def path_actions(node):
"The sequence of actions to get to this node."
if node.parent is None:
return []
return path_actions(node.parent) + [node.action]
def path_states(node):
"The sequence of states to get to this node."
if node in (cutoff, failure, None):
return []
return path_states(node.parent) + [node.state]
FIFOQueue = deque
LIFOQueue = list
class PriorityQueue:
"""A queue in which the item with minimum f(item) is always popped first."""
def __init__(self, items=(), key=lambda x: x):
self.key = key
self.items = [] # a heap of (score, item) pairs
for item in items:
self.add(item)
def add(self, item):
"""Add item to the queuez."""
pair = (self.key(item), item)
heapq.heappush(self.items, pair)
def pop(self):
"""Pop and return the item with min f(item) value."""
return heapq.heappop(self.items)[1]
def top(self): return self.items[0][1]
def __len__(self): return len(self.items)
# Search Algorithms: Best-First
# Best-first search with various f(n) functions gives us different search algorithms.
# Note that A*, weighted A* and greedy search can be given a heuristic function, h, but
# if h is not supplied they use the problem's default h function (if the problem does not
# define one, it is taken as h(n) = 0).
def best_first_search(problem, f):
"Search nodes with minimum f(node) value first."
node = Node(problem.initial)
frontier = PriorityQueue([node], key=f)
reached = {problem.initial: node}
while frontier:
node = frontier.pop()
if problem.is_goal(node.state):
return node
for child in expand(problem, node):
s = child.state
if s not in reached or child.path_cost < reached[s].path_cost:
reached[s] = child
frontier.add(child)
return failure
def best_first_tree_search(problem, f):
"A version of best_first_search without the `reached` table."
frontier = PriorityQueue([Node(problem.initial)], key=f)
while frontier:
node = frontier.pop()
if problem.is_goal(node.state):
return node
for child in expand(problem, node):
if not is_cycle(child):
frontier.add(child)
return failure
def g(n): return n.path_cost
def astar_search(problem, h=None):
"""Search nodes with minimum f(n) = g(n) + h(n)."""
h = h or problem.h
return best_first_search(problem, f=lambda n: g(n) + h(n))
def astar_tree_search(problem, h=None):
"""Search nodes with minimum f(n) = g(n) + h(n), with no `reached` table."""
h = h or problem.h
return best_first_tree_search(problem, f=lambda n: g(n) + h(n))
def weighted_astar_search(problem, h=None, weight=1.4):
"""Search nodes with minimum f(n) = g(n) + weight * h(n)."""
h = h or problem.h
return best_first_search(problem, f=lambda n: g(n) + weight * h(n))
def greedy_bfs(problem, h=None):
"""Search nodes with minimum h(n)."""
h = h or problem.h
return best_first_search(problem, f=h)
def uniform_cost_search(problem):
"Search nodes with minimum path cost first."
return best_first_search(problem, f=g)
def breadth_first_bfs(problem):
"Search shallowest nodes in the search tree first; using best-first."
return best_first_search(problem, f=len)
def depth_first_bfs(problem):
"Search deepest nodes in the search tree first; using best-first."
return best_first_search(problem, f=lambda n: -len(n))
def is_cycle(node, k=30):
"Does this node form a cycle of length k or less?"
def find_cycle(ancestor, k):
return (ancestor is not None and k > 0 and
(ancestor.state == node.state or find_cycle(ancestor.parent, k - 1)))
return find_cycle(node.parent, k)
# Other search algorithms
def breadth_first_search(problem):
"Search shallowest nodes in the search tree first."
node = Node(problem.initial)
if problem.is_goal(problem.initial):
return node
frontier = FIFOQueue([node])
reached = {problem.initial}
while frontier:
node = frontier.pop()
for child in expand(problem, node):
s = child.state
if problem.is_goal(s):
return child
if s not in reached:
reached.add(s)
frontier.appendleft(child)
return failure
def iterative_deepening_search(problem):
"Do depth-limited search with increasing depth limits."
for limit in range(1, sys.maxsize):
result = depth_limited_search(problem, limit)
if result != cutoff:
return result
def depth_limited_search(problem, limit=10):
"Search deepest nodes in the search tree first."
frontier = LIFOQueue([Node(problem.initial)])
result = failure
while frontier:
node = frontier.pop()
if problem.is_goal(node.state):
return node
elif len(node) >= limit:
result = cutoff
elif not is_cycle(node):
for child in expand(problem, node):
frontier.append(child)
return result
def depth_first_recursive_search(problem, node=None):
if node is None:
node = Node(problem.initial)
if problem.is_goal(node.state):
return node
elif is_cycle(node):
return failure
else:
for child in expand(problem, node):
result = depth_first_recursive_search(problem, child)
if result:
return result
return failure
# Problems
# Route finding
# In a RouteProblem, the states are names of "cities" (or other locations), like 'A' for Arad.
# The actions are also city names; 'Z' is the action to move to city 'Z'. The layout of cities
# is given by a separate data structure, a Map, which is a graph where there are vertexes
# (cities), links between vertexes, distances (costs) of those links (if not specified, the
# default is 1 for every link), and optionally the 2D (x, y) location of each city can be
# specified. A RouteProblem takes this Map as input and allows actions to move between
# linked cities. The default heuristic is straight-line distance to the goal, or is uniformly
# zero if locations were not given.
class RouteProblem(Problem):
"""A problem to find a route between locations on a `Map`.
Create a problem with RouteProblem(start, goal, map=Map(...)}).
States are the vertexes in the Map graph; actions are destination states."""
def actions(self, state):
"""The places neighboring `state`."""
return self.map.neighbors[state]
def result(self, state, action):
"""Go to the `action` place, if the map says that is possible."""
return action if action in self.map.neighbors[state] else state
def action_cost(self, s, action, s1):
"""The distance (cost) to go from s to s1."""
return self.map.distances[s, s1]
def h(self, node):
"Straight-line distance between state and the goal."
locs = self.map.locations
return straight_line_distance(locs[node.state], locs[self.goal])
def straight_line_distance(A, B):
"Straight-line distance between two points."
return sum(abs(a - b)**2 for (a, b) in zip(A, B)) ** 0.5
class Map:
"""A map of places in a 2D world: a graph with vertexes and links between them.
In `Map(links, locations)`, `links` can be either [(v1, v2)...] pairs,
or a {(v1, v2): distance...} dict. Optional `locations` can be {v1: (x, y)}
If `directed=False` then for every (v1, v2) link, we add a (v2, v1) link."""
def __init__(self, links, locations=None, directed=False):
if not hasattr(links, 'items'): # Distances are 1 by default
links = {link: 1 for link in links}
if not directed:
for (v1, v2) in list(links):
links[v2, v1] = links[v1, v2]
self.distances = links
self.neighbors = multimap(links)
self.locations = locations or defaultdict(lambda: (0, 0))
def multimap(pairs) -> dict:
"Given (key, val) pairs, make a dict of {key: [val,...]}."
result = defaultdict(list)
for key, val in pairs:
result[key].append(val)
return result
# Some specific RouteProblems
romania = Map(
{('O', 'Z'): 71, ('O', 'S'): 151, ('A', 'Z'): 75, ('A', 'S'): 140, ('A', 'T'): 118,
('L', 'T'): 111, ('L', 'M'): 70, ('D', 'M'): 75, ('C', 'D'): 120, ('C', 'R'): 146,
('C', 'P'): 138, ('R', 'S'): 80, ('F', 'S'): 99, ('B', 'F'): 211, ('B', 'P'): 101,
('B', 'G'): 90, ('B', 'U'): 85, ('H', 'U'): 98, ('E', 'H'): 86, ('U', 'V'): 142,
('I', 'V'): 92, ('I', 'N'): 87, ('P', 'R'): 97},
{'A': ( 76, 497), 'B': (400, 327), 'C': (246, 285), 'D': (160, 296), 'E': (558, 294),
'F': (285, 460), 'G': (368, 257), 'H': (548, 355), 'I': (488, 535), 'L': (162, 379),
'M': (160, 343), 'N': (407, 561), 'O': (117, 580), 'P': (311, 372), 'R': (227, 412),
'S': (187, 463), 'T': ( 83, 414), 'U': (471, 363), 'V': (535, 473), 'Z': (92, 539)})
# r0 = RouteProblem('A', 'A', map=romania)
# 8 Puzzle Problem
# A sliding tile puzzle where you can swap the blank with an adjacent piece, trying to reach
# a goal configuration. The cells are numbered 0 to 8, starting at the top left and going
# row by row left to right. The pieces are numebred 1 to 8, with 0 representing the blank.
# An action is the cell index number that is to be swapped with the blank (not the actual
# number to be swapped but the index into the state).
# There are two disjoint sets of states that cannot be reached from each other. One set
# has an even number of "inversions"; the other has an odd number. An inversion is when
# a piece in the state is larger than a piece that follows it.
class EightPuzzle(Problem):
""" The problem of sliding tiles numbered from 1 to 8 on a 3x3 board,
where one of the squares is a blank, trying to reach a goal configuration.
A board state is represented as a tuple of length 9, where the element at index i
represents the tile number at index i, or 0 if for the empty square, e.g. the goal:
1 2 3
4 5 6 ==> (1, 2, 3, 4, 5, 6, 7, 8, 0)
7 8 _
"""
def __init__(self, initial, goal=(0, 1, 2, 3, 4, 5, 6, 7, 8)):
assert inversions(initial) % 2 == inversions(goal) % 2 # Parity check
self.initial, self.goal = initial, goal
def actions(self, state):
"""The indexes of the squares that the blank can move to."""
moves = ((1, 3), (0, 2, 4), (1, 5),
(0, 4, 6), (1, 3, 5, 7), (2, 4, 8),
(3, 7), (4, 6, 8), (7, 5))
blank = state.index(0)
return moves[blank]
def result(self, state, action):
"""Swap the blank with the square numbered `action`."""
s = list(state)
blank = state.index(0)
s[action], s[blank] = s[blank], s[action]
return tuple(s)
def h1(self, node):
"""The misplaced tiles heuristic."""
return hamming_distance(node.state, self.goal)
def h2(self, node):
"""The Manhattan heuristic."""
X = (0, 1, 2, 0, 1, 2, 0, 1, 2)
Y = (0, 0, 0, 1, 1, 1, 2, 2, 2)
return sum(abs(X[s] - X[g]) + abs(Y[s] - Y[g])
for (s, g) in zip(node.state, self.goal) if s != 0)
def h(self, node): return h2(self, node)
def hamming_distance(A, B):
"Number of positions where vectors A and B are different."
return sum(a != b for a, b in zip(A, B))
def inversions(board):
"The number of times a piece is a smaller number than a following piece."
return sum((a > b and a != 0 and b != 0) for (a, b) in combinations(board, 2))
def board8(board, fmt=(3 * '{} {} {}\n')):
"A string representing an 8-puzzle board"
return fmt.format(*board).replace('0', '_')
# Instrumentation utils
# We'll use CountCalls to wrap a Problem object in such a way that calls to its methods
# are delegated to the original problem, but each call increments a counter. Once we've
# solved the problem, we print out summary statistics.
class CountCalls:
"""Delegate all attribute gets to the object, and count them in ._counts"""
def __init__(self, obj):
self._object = obj
self._counts = Counter()
def __getattr__(self, attr):
"Delegate to the original object, after incrementing a counter."
self._counts[attr] += 1
return getattr(self._object, attr)
def report(searchers, problems, verbose=True):
"""Show summary statistics for each searcher (and on each problem unless verbose is false)."""
for searcher in searchers:
print(searcher.__name__ + ':')
total_counts = Counter()
for p in problems:
prob = CountCalls(p)
soln = searcher(prob)
counts = prob._counts;
counts.update(actions=len(soln), cost=soln.path_cost)
total_counts += counts
if verbose: report_counts(counts, str(p)[:40])
report_counts(total_counts, 'TOTAL\n')
def report_counts(counts, name):
"""Print one line of the counts report."""
print('{:9,d} nodes |{:9,d} goal |{:5.0f} cost |{:8,d} actions | {}'.format(
counts['result'], counts['is_goal'], counts['cost'], counts['actions'], name))