-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDQN.py
75 lines (63 loc) · 2.85 KB
/
DQN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
class Net(nn.Module):
def __init__(self, N_STATES, N_ACTIONS):
super(Net, self).__init__()
self.fc1 = nn.Linear(N_STATES, 100)
self.fc1.weight.data.normal_(0, 0.1) # initialization, set seed to ensure the same result
self.out = nn.Linear(100, N_ACTIONS)
self.out.weight.data.normal_(0, 0.1) # initialization
def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
action_value = self.out(x)
return action_value
class DQN(object):
def __init__(self, N_STATES, N_ACTIONS, BATCH_SIZE, LR, EPSILON, GAMMA, TARGET_REPLACE_ITER, MEMORY_CAPACITY):
self.N_STATES = N_STATES
self.N_ACTIONS = N_ACTIONS
self.MEMORY_CAPACITY = MEMORY_CAPACITY
self.EPSILON = EPSILON
self.BATCH_SIZE = BATCH_SIZE
self.LR = LR
self.GAMMA = GAMMA
self.TARGET_REPLACE_ITER = TARGET_REPLACE_ITER
self.eval_net, self.target_net = Net(N_STATES, N_ACTIONS), Net(N_STATES, N_ACTIONS)
self.learn_step_counter = 0
self.memory_counter = 0
self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2))
self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)
self.loss_func = nn.MSELoss()
def choose_action(self, x):
x = torch.unsqueeze(torch.FloatTensor(x), 0)
if np.random.uniform() < self.EPSILON:
action_value = self.eval_net.forward(x)
action = np.array(torch.max(action_value, 1)[1].tolist())
action = action[0]
else:
action = np.random.randint(0, self.N_ACTIONS)
return action
def store_transition(self, s, a, r, s_):
transition = np.hstack((s, [a, r], s_))
index = self.memory_counter % self.MEMORY_CAPACITY # If full, restart from the beginning
self.memory[index, :] = transition
self.memory_counter += 1
def learn(self):
if self.learn_step_counter % self.TARGET_REPLACE_ITER == 0:
self.target_net.load_state_dict(self.eval_net.state_dict())
self.learn_step_counter += 1
sample_index = np.random.choice(self.MEMORY_CAPACITY, self.BATCH_SIZE)
b_memory = self.memory[sample_index, :]
b_s = torch.FloatTensor(b_memory[:, :self.N_STATES])
b_a = torch.LongTensor(b_memory[:, self.N_STATES:self.N_STATES + 1])
b_r = torch.FloatTensor(b_memory[:,self.N_STATES + 1:self.N_STATES + 2])
b_s_ = torch.FloatTensor(b_memory[:, -self.N_STATES:])
q_eval = self.eval_net(b_s).gather(1, b_a)
q_next = self.target_net(b_s_).detach()
q_target = b_r + self.GAMMA * q_next.max(1)[0].view(self.BATCH_SIZE, 1)
loss = self.loss_func(q_eval, q_target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()