forked from Grid2op/grid2op
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_renderer_14.py
127 lines (106 loc) · 5.14 KB
/
test_renderer_14.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import grid2op
from grid2op.Agent import DoNothingAgent
from grid2op.Agent import GreedyAgent, RandomAgent
import numpy as np
import pdb
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
env = grid2op.make("case14_realistic")
class MyExpertAgent(GreedyAgent):
def __init__(self, action_space):
GreedyAgent.__init__(self, action_space)
self.saved_score = []
def act(self, observation, reward, done=False):
"""
By definition, all "greedy" agents are acting the same way. The only thing that can differentiate multiple
agents is the actions that are tested.
These actions are defined in the method :func:`._get_tested_action`. This :func:`.act` method implements the
greedy logic: take the actions that maximizes the instantaneous reward on the simulated action.
Parameters
----------
observation: :class:`grid2op.BaseObservation.BaseObservation`
The current observation of the :class:`grid2op.Environment`
reward: ``float``
The current reward. This is the reward obtained by the previous action
done: ``bool``
Whether the episode has ended or not. Used to maintain gym compatibility
Returns
-------
res: :class:`grid2op.BaseAction.BaseAction`
The action chosen by the bot / controller / agent.
"""
# print("________________\nbeginning simulate")
self.tested_action = self._get_tested_action(observation)
if len(self.tested_action) > 1:
all_rewards = np.full(shape=len(self.tested_action), fill_value=np.NaN, dtype=np.float)
for i, action in enumerate(self.tested_action):
simul_obs, simul_reward, simul_has_error, simul_info = observation.simulate(action)
all_rewards[i] = simul_reward
# if simul_reward > 19:
# pdb.set_trace()
reward_idx = np.argmax(all_rewards) # rewards.index(max(rewards))
expected_reward = np.max(all_rewards)
best_action = self.tested_action[reward_idx]
# print("BaseAction taken:\n{}".format(best_action))
else:
all_rewards = [None]
expected_reward = None
best_action = self.tested_action[0]
self.saved_score.append(((best_action, expected_reward),
[el for el in zip(self.tested_action, all_rewards)]))
# print("end simulate\n_____________")
return best_action
def _get_tested_action(self, observation):
res = [self.action_space({})] # add the do nothing
for i, el in enumerate(observation.line_status):
# try to reconnect powerlines
if not el:
tmp = np.zeros(self.action_space.n_line, dtype=np.int)
tmp[i] = 1
action = self.action_space({"set_line_status": tmp})
action = action.update({"set_bus": {"lines_or_id": [(i, 1)], "lines_ex_id": [(i, 1)]}})
res.append(action)
# disconnect the powerlines
## 12 to 13, 10 to 9 # 5 to 12, 5 to 10,
for i in [19, 17]: # , 10 ,12 <- with that it takes action that leads to divergence, check that!
tmp = np.full(self.action_space.n_line, fill_value=False, dtype=np.bool)
tmp[i] = True
action = self.action_space({"change_line_status": tmp})
if not observation.line_status[i]:
# so the action consisted in reconnecting the powerline
# i need to say on which bus
action = action.update({"set_bus": {"lines_or_id": [(i, 1)], "lines_ex_id": [(i, 1)]}})
res.append(action)
# play with the topology
## i put powerlines going from 1 to 4 with powerline going from 3 to 4 at substation 4
action = self.action_space({"change_bus":
{"substations_id": [(4, np.array([False, True, True, False, False]))]}})
res.append(action)
## i put powerline from 5 to 12 with powerline from 5 to 10 at substation 5
action = self.action_space({"change_bus":
{"substations_id": [(5, np.array([False, True, False, True, False, False]))]}})
res.append(action)
## i put powerline from 1 to 4 with powerline from 1 to 3 with at substation 1
action = self.action_space({"change_bus":
{"substations_id": [(1, np.array([False, False, True, True, False, False]))]}})
res.append(action)
return res
my_agent = MyExpertAgent(env.action_space)
# my_agent = RandomAgent(env.action_space)
print("Total unitary action possible: {}".format(my_agent.action_space.n))
all_obs = []
obs = env.reset()
all_obs.append(obs)
reward = env.reward_range[0]
done = False
nb_step = 0
while True:
env.render()
action = my_agent.act(obs, reward, done)
obs, reward, done, _ = env.step(action)
print("Rendering timestep {}".format(nb_step))
if done:
break
all_obs.append(obs)
nb_step += 1