-
Notifications
You must be signed in to change notification settings - Fork 0
/
q_learning.py
74 lines (67 loc) · 2.52 KB
/
q_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import numpy as np
from collections import defaultdict, deque
import sys
import matplotlib.pyplot as plt
def epsilon_greedy(Q, state, nA, eps):
if np.random.random() > eps: # select greedy action with probability epsilon
return np.argmax(Q[state])
else: # otherwise, select an action randomly
return np.random.choice(np.arange(nA))
def train(env, num_episodes, alpha, mov_avg=1000, gamma=0.95, EbN0=1):
# initialize action-value function (empty dictionary of arrays)
Q = defaultdict(lambda: np.zeros(env.nA))
nA = env.nA
tmp_scores = deque(maxlen=mov_avg)
avg_scores = deque(maxlen=num_episodes)
# initialize performance monitor
# loop over episodes
eps = 0.9
for i_episode in range(1, num_episodes+1):
if i_episode % 1000 == 0:
eps = max(eps*0.9, 1e-3)
# monitor progress
if i_episode % 10000 == 0:
print("\rEpisode {}/{}".format(i_episode, num_episodes), end="")
sys.stdout.flush()
state = env.reset()
total_reward = 0
while True:
action = epsilon_greedy(Q, state, nA, eps)
next_state, reward, done, _ = env.step(action)
total_reward += reward
if not done:
a_prime = np.argmax(Q[next_state])
td_error = alpha*(reward + gamma*Q[next_state][a_prime] - Q[state][action])
Q[state][action] += td_error
state = next_state
if done:
tmp_scores.append(total_reward)
break
if (i_episode % mov_avg == 0):
avg_scores.append(np.mean(tmp_scores))
print(('Best Average Reward over %d Episodes: ' % mov_avg), np.max(avg_scores))
env.Q = Q
return Q
def test(env, num_runs, optimal_Q, EbN0=0.1):
BER = 0
policy = lambda state : np.argmax(optimal_Q[state])
env.set_noise(EbN0)
max_iters = 10
for iter in range(num_runs):
state = env.reset()
action = policy(state)
i = 0
while True:
next_state, _, done, _ = env.step(action)
if not done and i < max_iters:
next_action = policy(next_state)
state = next_state
action = next_action
if done:
break
if i >= max_iters:
print("Agent unable to decode")
break
i += 1
BER += np.sum(env.z.astype(int) ^ env.codeword.astype(int)) / len(env.z)
return BER/num_runs