-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
136 lines (117 loc) · 5.65 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import heapq, collections, re, sys, time, os, random
############################################################
# Abstract interfaces for search problems and search algorithms.
class SearchProblem:
# Return the start state.
def startState(self): raise NotImplementedError("Override me")
# Return whether |state| is an end state or not.
def isEnd(self, state): raise NotImplementedError("Override me")
# Return a list of (action, newState, cost) tuples corresponding to edges
# coming out of |state|.
def succAndCost(self, state): raise NotImplementedError("Override me")
class SearchAlgorithm:
# First, call solve on the desired SearchProblem |problem|.
# Then it should set two things:
# - self.actions: list of actions that takes one from the start state to an end
# state; if no action sequence exists, set it to None.
# - self.totalCost: the sum of the costs along the path or None if no valid
# action sequence exists.
def solve(self, problem): raise NotImplementedError("Override me")
############################################################
# Uniform cost search algorithm (Dijkstra's algorithm).
class UniformCostSearch(SearchAlgorithm):
def __init__(self, verbose=0):
self.verbose = verbose
def solve(self, problem):
# If a path exists, set |actions| and |totalCost| accordingly.
# Otherwise, leave them as None.
self.actions = None
self.totalCost = None
self.numStatesExplored = 0
# Initialize data structures
frontier = PriorityQueue() # Explored states are maintained by the frontier.
backpointers = {} # map state to (action, previous state)
# Add the start state
startState = problem.startState()
frontier.update(startState, 0)
while True:
# Remove the state from the queue with the lowest pastCost
# (priority).
state, pastCost = frontier.removeMin()
if state == None: break
self.numStatesExplored += 1
if self.verbose >= 2:
print "Exploring %s with pastCost %s" % (state, pastCost)
# Check if we've reached an end state; if so, extract solution.
if problem.isEnd(state):
self.actions = []
while state != startState:
action, prevState = backpointers[state]
self.actions.append(action)
state = prevState
self.actions.reverse()
self.totalCost = pastCost
if self.verbose >= 1:
print "numStatesExplored = %d" % self.numStatesExplored
print "totalCost = %s" % self.totalCost
print "actions = %s" % self.actions
return
# Expand from |state| to new successor states,
# updating the frontier with each newState.
for action, newState, cost in problem.succAndCost(state):
if self.verbose >= 3:
print " Action %s => %s with cost %s + %s" % (action, newState, pastCost, cost)
if frontier.update(newState, pastCost + cost):
# Found better way to go to |newState|, update backpointer.
backpointers[newState] = (action, state)
if self.verbose >= 1:
print "No path found"
# Data structure for supporting uniform cost search.
class PriorityQueue:
def __init__(self):
self.DONE = -100000
self.heap = []
self.priorities = {} # Map from state to priority
# Insert |state| into the heap with priority |newPriority| if
# |state| isn't in the heap or |newPriority| is smaller than the existing
# priority.
# Return whether the priority queue was updated.
def update(self, state, newPriority):
oldPriority = self.priorities.get(state)
if oldPriority == None or newPriority < oldPriority:
self.priorities[state] = newPriority
heap.heappush(self.heap, (newPriority, state))
return True
return False
# Returns (state with minimum priority, priority)
# or (None, None) if the priority queue is empty.
def removeMin(self):
while len(self.heap) > 0:
priority, state = heapq.heappop(self.heap)
if self.priorities[state] == self.DONE: continue # Outdated priority, skip
self.priorities[state] = self.DONE
return (state, priority)
return (None, None) # Nothing left...
############################################################
# Simple examples of search problems to test your code for Problem 1.
# A simple search problem on the number line:
# Start at 0, want to go to 10, costs 1 to move left, 2 to move right.
class NumberLineSearchProblem:
def startState(self): return 0
def isEnd(self, state): return state == 10
def succAndCost(self, state): return [('West', state-1, 1), ('East', state+1, 2)]
# A simple search problem on a square grid:
# Start at init position, want to go to (0, 0)
# cost 2 to move up/left, 1 to move down/right
class GridSearchProblem(SearchProblem):
def __init__(self, size, x, y): self.size, self.start = size, (x,y)
def startState(self): return self.start
def isEnd(self, state): return state == (0, 0)
def succAndCost(self, state):
x, y = state
results = []
if x-1 >= 0: results.append(('North', (x-1, y), 2))
if x+1 < self.size: results.append(('South', (x+1, y), 1))
if y-1 >= 0: results.append(('West', (x, y-1), 2))
if y+1 < self.size: results.append(('East', (x, y+1), 1))
return results