forked from tensorflow/models
-
Notifications
You must be signed in to change notification settings - Fork 0
/
replay_buffer.py
executable file
·161 lines (132 loc) · 4.78 KB
/
replay_buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Replay buffer.
Implements replay buffer in Python.
"""
import random
import numpy as np
class ReplayBuffer(object):
def __init__(self, max_size):
self.max_size = max_size
self.cur_size = 0
self.buffer = {}
self.init_length = 0
def __len__(self):
return self.cur_size
def seed_buffer(self, episodes):
self.init_length = len(episodes)
self.add(episodes, np.ones(self.init_length))
def add(self, episodes, *args):
"""Add episodes to buffer."""
idx = 0
while self.cur_size < self.max_size and idx < len(episodes):
self.buffer[self.cur_size] = episodes[idx]
self.cur_size += 1
idx += 1
if idx < len(episodes):
remove_idxs = self.remove_n(len(episodes) - idx)
for remove_idx in remove_idxs:
self.buffer[remove_idx] = episodes[idx]
idx += 1
assert len(self.buffer) == self.cur_size
def remove_n(self, n):
"""Get n items for removal."""
# random removal
idxs = random.sample(xrange(self.init_length, self.cur_size), n)
return idxs
def get_batch(self, n):
"""Get batch of episodes to train on."""
# random batch
idxs = random.sample(xrange(self.cur_size), n)
return [self.buffer[idx] for idx in idxs], None
def update_last_batch(self, delta):
pass
class PrioritizedReplayBuffer(ReplayBuffer):
def __init__(self, max_size, alpha=0.2,
eviction_strategy='rand'):
self.max_size = max_size
self.alpha = alpha
self.eviction_strategy = eviction_strategy
assert self.eviction_strategy in ['rand', 'fifo', 'rank']
self.remove_idx = 0
self.cur_size = 0
self.buffer = {}
self.priorities = np.zeros(self.max_size)
self.init_length = 0
def __len__(self):
return self.cur_size
def add(self, episodes, priorities, new_idxs=None):
"""Add episodes to buffer."""
if new_idxs is None:
idx = 0
new_idxs = []
while self.cur_size < self.max_size and idx < len(episodes):
self.buffer[self.cur_size] = episodes[idx]
new_idxs.append(self.cur_size)
self.cur_size += 1
idx += 1
if idx < len(episodes):
remove_idxs = self.remove_n(len(episodes) - idx)
for remove_idx in remove_idxs:
self.buffer[remove_idx] = episodes[idx]
new_idxs.append(remove_idx)
idx += 1
else:
assert len(new_idxs) == len(episodes)
for new_idx, ep in zip(new_idxs, episodes):
self.buffer[new_idx] = ep
self.priorities[new_idxs] = priorities
self.priorities[0:self.init_length] = np.max(
self.priorities[self.init_length:])
assert len(self.buffer) == self.cur_size
return new_idxs
def remove_n(self, n):
"""Get n items for removal."""
assert self.init_length + n <= self.cur_size
if self.eviction_strategy == 'rand':
# random removal
idxs = random.sample(xrange(self.init_length, self.cur_size), n)
elif self.eviction_strategy == 'fifo':
# overwrite elements in cyclical fashion
idxs = [
self.init_length +
(self.remove_idx + i) % (self.max_size - self.init_length)
for i in xrange(n)]
self.remove_idx = idxs[-1] + 1 - self.init_length
elif self.eviction_strategy == 'rank':
# remove lowest-priority indices
idxs = np.argpartition(self.priorities, n)[:n]
return idxs
def sampling_distribution(self):
p = self.priorities[:self.cur_size]
p = np.exp(self.alpha * (p - np.max(p)))
norm = np.sum(p)
if norm > 0:
uniform = 0.0
p = p / norm * (1 - uniform) + 1.0 / self.cur_size * uniform
else:
p = np.ones(self.cur_size) / self.cur_size
return p
def get_batch(self, n):
"""Get batch of episodes to train on."""
p = self.sampling_distribution()
idxs = np.random.choice(self.cur_size, size=n, replace=False, p=p)
self.last_batch = idxs
return [self.buffer[idx] for idx in idxs], p[idxs]
def update_last_batch(self, delta):
"""Update last batch idxs with new priority."""
self.priorities[self.last_batch] = np.abs(delta)
self.priorities[0:self.init_length] = np.max(
self.priorities[self.init_length:])