-
Notifications
You must be signed in to change notification settings - Fork 0
/
separate_reward_vec_normalize.py
108 lines (89 loc) · 3.5 KB
/
separate_reward_vec_normalize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Dec 10 10:58:00 2021
@author: wenminggong
VecNormalize wrapper for separate-reward VecEnv,
only normalize observation, not for separate rewards [total_r, robot_r, pref_r].
"""
import pickle
from copy import deepcopy
from typing import Any, Dict, Union
import gym
import numpy as np
from stable_baselines3.common import utils
from stable_baselines3.common.running_mean_std import RunningMeanStd
from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvStepReturn, VecEnvWrapper
from stable_baselines3.common.vec_env.vec_normalize import VecNormalize
class SeparateRewardVecNormalize(VecNormalize):
"""
A moving average, normalizing wrapper for separate-reward vectorized environment.
has support for saving/loading moving average,
:param venv: the vectorized environment to wrap
:param training: Whether to update or not the moving average
:param norm_obs: Whether to normalize observation or not (default: True)
:param norm_reward: Whether to normalize rewards or not (default: True)
:param clip_obs: Max absolute value for observation
:param clip_reward: Max value absolute for discounted reward
:param gamma: discount factor
:param epsilon: To avoid division by zero
"""
def __init__(
self,
venv: VecEnv,
training: bool = True,
norm_obs: bool = True,
norm_reward: bool = True,
clip_obs: float = 10.0,
clip_reward: float = 10.0,
gamma: float = 0.99,
epsilon: float = 1e-8,
):
VecNormalize.__init__(
self,
venv,
training,
norm_obs,
norm_reward,
clip_obs,
clip_reward,
gamma,
epsilon,)
# SeparateRewardVecEnv return rewards=np.stack([total_reward, robot_reward, pref_reward], ...)
self.ret_rms = RunningMeanStd(shape=(3,))
def step_wait(self) -> VecEnvStepReturn:
"""
Apply sequence of actions to sequence of environments
actions -> (observations, rewards, dones)
where ``dones`` is a boolean vector indicating whether each element is new;
rewards are not scaled
"""
obs, rewards, dones, infos = self.venv.step_wait()
self.old_obs = obs
self.old_reward = rewards
if self.training:
if isinstance(obs, dict) and isinstance(self.obs_rms, dict):
for key in self.obs_rms.keys():
self.obs_rms[key].update(obs[key])
else:
self.obs_rms.update(obs)
obs = self.normalize_obs(obs)
'''
if self.training:
self._update_reward(rewards)
rewards = self.normalize_reward(rewards)
'''
# Normalize the terminal observations
for idx, done in enumerate(dones):
if not done:
continue
if "terminal_observation" in infos[idx]:
infos[idx]["terminal_observation"] = self.normalize_obs(infos[idx]["terminal_observation"])
# self.ret[dones] = 0
return obs, rewards, dones, infos
def _update_reward(self, reward: np.ndarray) -> None:
"""Update reward normalization statistics."""
# why use self.ret to update mean and std?
self.ret = self.ret * self.gamma + reward
self.ret_rms.update(self.ret)
# self.ret_rms.update(reward) # direct update reward moving mean and std