diff --git a/webots/controllers/RL_Supervisor/agent.py b/webots/controllers/RL_Supervisor/agent.py index f45ef80..20ef40a 100644 --- a/webots/controllers/RL_Supervisor/agent.py +++ b/webots/controllers/RL_Supervisor/agent.py @@ -27,14 +27,15 @@ ################################################################################ # Imports ################################################################################ + import csv import os import struct -import numpy as np # pylint: disable=import-error -import tensorflow as tf # pylint: disable=import-error -import tensorflow_probability as tfp # pylint: disable=import-error -from trajectory_buffer import Memory -from networks import Models +import numpy as np +import tensorflow as tf +import tensorflow_probability as tfp +from trajectory_buffer import Memory +from networks import Models ################################################################################ # Variables @@ -68,6 +69,16 @@ MIN_STD_DEV = 0.01 # Minimum standard deviation STD_DEV_FACTOR = 0.995 # Discounter standard deviation factor +TRANSLATION_FIELD = "translation" +ROTATION_FIELD = "rotation" + +IDLE = "IDLE_STATE" +READY = "READY_STATE" +TRAINING = "TRAINING_STATE" + +DIRECTORY = "logs" +FILE_DIRECTORY = "training_logs.csv" + ################################################################################ # Classes ################################################################################ @@ -94,37 +105,34 @@ def __init__( max_buffer_length=65536, ): self.__serialmux = smp_server - self.__policy_clip = policy_clip self.__chkpt_dir = chkpt_dir self.train_mode = False self.__top_speed = top_speed + self.__std_dev = 0.05 self.__memory = Memory(batch_size, max_buffer_length, gamma, gae_lambda) - self.__neural_network = Models(actor_alpha, critic_alpha) + self.__neural_network = Models(actor_alpha, critic_alpha, self.__std_dev, policy_clip) self.__training_index = 0 # Track batch index during training self.__current_batch = None # Saving of the current batch which is in process - self.__std_dev = 0.05 self.n_epochs = 3 self.done = False self.action = None self.value = None self.adjusted_log_prob = None self.num_episodes = 0 - self.state = "IDLE" + self.state = IDLE self.data_sent = True self.unsent_data = [] - self.critic_loss_history = [] - self.actor_loss_history = [] self.reward_history = [] def set_train_mode(self): """Set the Agent mode to train mode.""" self.train_mode = True - self.state = "READY" + self.state = READY def set_drive_mode(self): """Set the Agent mode to drive mode.""" self.train_mode = False - self.state = "READY" + self.state = READY self.num_episodes = 0 def store_transition( @@ -235,11 +243,11 @@ def send_motor_speeds(self, state): right_motor_speed = int(self.__top_speed + speed_difference) control_data = struct.pack("2H", left_motor_speed, right_motor_speed) - self.data_sent = self.__serialmux.send_data("SPEED_SET", control_data) + self.data_sent = self.__serialmux.send_data(SPEED_SET_CHANNEL_NAME, control_data) # Failed to send data. Appends the data to unsent_data List. if self.data_sent is False: - self.unsent_data.append(("SPEED_SET", control_data)) + self.unsent_data.append((SPEED_SET_CHANNEL_NAME, control_data)) def update(self, robot_node): """ @@ -251,19 +259,19 @@ def update(self, robot_node): """ # Checks whether the sequence has ended if it is set to Training mode. - if (self.train_mode is True) and ( - (self.done is True) or (self.__memory.is_memory_full() is True) - ): + if self.train_mode is True and ( + self.done is True or self.__memory.is_memory_full() is True): + cmd_payload = struct.pack("B", CMD_ID_SET_TRAINING_STATE) - self.data_sent = self.__serialmux.send_data("CMD", cmd_payload) + self.data_sent = self.__serialmux.send_data(COMMAND_CHANNEL_NAME, cmd_payload) # Failed to send data. Appends the data to unsent_data List. if self.data_sent is False: - self.unsent_data.append(("CMD", cmd_payload)) + self.unsent_data.append((COMMAND_CHANNEL_NAME, cmd_payload)) # Stopping condition for sequence was reached. self.reinitialize(robot_node) - self.state = "TRAINING" + self.state = TRAINING # Checks whether the sequence has ended if it is set to driving mode. if (self.train_mode is False) and (self.done is True): @@ -272,20 +280,20 @@ def update(self, robot_node): cmd_payload = struct.pack("B", CMD_ID_SET_READY_STATE) self.data_sent = self.__serialmux.send_data( - "SPEED_SET", motorcontrol + SPEED_SET_CHANNEL_NAME, motorcontrol ) # stop the motors immediately # Failed to send data. Appends the data to unsent_data List if self.data_sent is False: - self.unsent_data.append(("SPEED_SET", motorcontrol)) + self.unsent_data.append((SPEED_SET_CHANNEL_NAME, motorcontrol)) self.reinitialize(robot_node) - self.data_sent = self.__serialmux.send_data("CMD", cmd_payload) + self.data_sent = self.__serialmux.send_data(COMMAND_CHANNEL_NAME, cmd_payload) # Failed to send data. Appends the data to unsent_data List if self.data_sent is False: - self.unsent_data.append(("CMD", cmd_payload)) - self.state = "IDLE" + self.unsent_data.append((COMMAND_CHANNEL_NAME, cmd_payload)) + self.state = IDLE def normalize_sensor_data(self, sensor_data): """ @@ -300,8 +308,8 @@ def normalize_sensor_data(self, sensor_data): NumPy array of float32: Normalized Sensor Data """ - sensor_data = np.array(sensor_data) / MAX_SENSOR_VALUE - return sensor_data + normalized_sensor_data = np.array(sensor_data) / MAX_SENSOR_VALUE + return normalized_sensor_data def determine_reward(self, sensor_data): """ @@ -315,14 +323,12 @@ def determine_reward(self, sensor_data): Returns ---------- - float: the Resulting Reward + float: The Resulting Reward. """ reward = self.__memory.calculate_reward(sensor_data) return reward - # pylint: disable=too-many-arguments - # pylint: disable=too-many-locals def learn(self, states, actions, old_probs, values, rewards, advantages): """ Perform training to optimize model weights. @@ -336,88 +342,34 @@ def learn(self, states, actions, old_probs, values, rewards, advantages): rewards: The saved rewards received for taking the actions. advantages: the computed advantage values for each state in a given Data size. """ + # scales the sensor data to a range between 0 and 1 + m_states = self.normalize_sensor_data(states) for _ in range(self.n_epochs): - # optimize Actor Network weights - with tf.GradientTape() as tape: - states = tf.convert_to_tensor(states) - actions = tf.convert_to_tensor(actions) - old_probs = tf.convert_to_tensor(old_probs) - - # Forward pass through the actor network to get the action mean - predict_mean = self.__neural_network.actor_network(states) - - # Create the normal distribution with the predicted mean - new_dist = tfp.distributions.Normal(predict_mean, self.__std_dev) - - # Invert the tanh transformation to recover the original actions before tanh - untransformed_actions = tf.atanh(actions) - - new_log_prob = new_dist.log_prob(untransformed_actions) + states = tf.convert_to_tensor(m_states) + actions = tf.convert_to_tensor(actions) + old_probs = tf.convert_to_tensor(old_probs) - # Compute the log of the Jacobian for the tanh transformation - jacobian_log_det = tf.math.log(1 - tf.square(actions)) - adjusted_new_log_prob = new_log_prob - jacobian_log_det - - # The ratio between the new model and the old model’s action log probabilities - prob_ratio = tf.exp(adjusted_new_log_prob - old_probs) - - # If the ratio is too large or too small, it will be - # clipped according to the surrogate function. - weighted_probs = prob_ratio * advantages - clipped_probs = tf.clip_by_value( - prob_ratio, 1 - self.__policy_clip, 1 + self.__policy_clip - ) - weighted_clipped_probs = clipped_probs * advantages - - # Policy Gradient Loss - actor_loss = -tf.reduce_mean( - tf.minimum(weighted_probs, weighted_clipped_probs) - ) - - # calculate gradient - actor_params = self.__neural_network.actor_network.trainable_variables - actor_grads = tape.gradient(actor_loss, actor_params) - self.__neural_network.actor_optimizer.apply_gradients( - zip(actor_grads, actor_params) - ) + # optimize Actor Network weights + self.__neural_network.compute_actor_gradient(states, actions, old_probs, advantages) # optimize Critic Network weights - with tf.GradientTape() as tape: - - # The critical value represents the expected return from state 𝑠𝑡. - # It provides an estimate of how good it is to be in a given state. - critic_value = self.__neural_network.critic_network(states) - - # the total discounted reward accumulated from time step 𝑡 - estimate_returns = advantages + values + self.__neural_network.compute_critic_gradient(states, values, advantages) - # Generate loss - critic_loss = tf.math.reduce_mean( - tf.math.pow(estimate_returns - critic_value, 2) - ) - - # calculate gradient - critic_params = self.__neural_network.critic_network.trainable_variables - critic_grads = tape.gradient(critic_loss, critic_params) - self.__neural_network.critic_optimizer.apply_gradients( - zip(critic_grads, critic_params) - ) - self.actor_loss_history.append(actor_loss.numpy()) - self.critic_loss_history.append(critic_loss.numpy()) - self.reward_history.append(sum(rewards)) + # Save the rewards received + self.reward_history.append(sum(rewards)) - # saving logs in a CSV file - self.save_logs_to_csv() + # saving logs in a CSV file + self.save_logs_to_csv() def save_logs_to_csv(self): """Function for saving logs in a CSV file""" # Ensure the directory exists - log_dir = "logs" + log_dir = DIRECTORY os.makedirs(log_dir, exist_ok=True) - log_file = os.path.join(log_dir, "training_logs.csv") + log_file = os.path.join(log_dir, FILE_DIRECTORY) with open(log_file, mode="w", encoding="utf-8", newline="") as file: writer = csv.writer(file) @@ -425,8 +377,8 @@ def save_logs_to_csv(self): for indx, reward in enumerate(self.reward_history): writer.writerow( [ - self.actor_loss_history[indx], - self.critic_loss_history[indx], + self.__neural_network.actor_loss_history[indx], + self.__neural_network.critic_loss_history[indx], reward, ] ) @@ -469,14 +421,14 @@ def perform_training(self): self.__current_batch = None self.done = False self.__memory.clear_memory() - self.state = "IDLE" + self.state = IDLE self.num_episodes += 1 cmd_payload = struct.pack("B", CMD_ID_SET_READY_STATE) - self.data_sent = self.__serialmux.send_data("CMD", cmd_payload) + self.data_sent = self.__serialmux.send_data(COMMAND_CHANNEL_NAME, cmd_payload) # Failed to send data. Appends the data to unsent_data List if self.data_sent is False: - self.unsent_data.append(("CMD", cmd_payload)) + self.unsent_data.append((COMMAND_CHANNEL_NAME, cmd_payload)) # Minimize standard deviation until the minimum standard deviation is reached self.__std_dev = self.__std_dev * STD_DEV_FACTOR @@ -490,8 +442,8 @@ def reinitialize(self, robot_node): ---------- robot_node: The Robot interface """ - trans_field = robot_node.getField("translation") - rot_field = robot_node.getField("rotation") + trans_field = robot_node.getField(TRANSLATION_FIELD) + rot_field = robot_node.getField(ROTATION_FIELD) initial_position = POSITION_DATA initial_orientation = ORIENTATION_DATA trans_field.setSFVec3f(initial_position) diff --git a/webots/controllers/RL_Supervisor/networks.py b/webots/controllers/RL_Supervisor/networks.py index 1535330..9deee19 100644 --- a/webots/controllers/RL_Supervisor/networks.py +++ b/webots/controllers/RL_Supervisor/networks.py @@ -28,35 +28,43 @@ # Imports ################################################################################ -from tensorflow import keras # pylint: disable=import-error -from keras import layers # pylint: disable=import-error -from keras.regularizers import l2 # pylint: disable=import-error +import tensorflow as tf +import tensorflow_probability as tfp +from tensorflow import keras +from keras import layers +from keras.regularizers import l2 ################################################################################ # Variables ################################################################################ +# Constants +NUM_SENSORS = 5 # Assuming 5 sensor inputs ################################################################################ # Classes ################################################################################ -class Models: +class Models: # pylint: disable=too-many-instance-attributes """Class for building networks of actors and critics.""" - def __init__(self, actor_alpha, critic_alpha): + def __init__(self, actor_alpha, critic_alpha, std_dev, policy_clip): self.__actor_learning_rate = actor_alpha self.__critic_learning_rate = critic_alpha self.actor_network = self.build_actor_network() - self.critic_network = self.build_critic_network() + self.critic_network = self.build_actor_network() + self.std_dev = std_dev + self.policy_clip = policy_clip self.actor_optimizer = keras.optimizers.Adam(self.__actor_learning_rate) self.critic_optimizer = keras.optimizers.Adam(self.__critic_learning_rate) + self.critic_loss_history = [] + self.actor_loss_history = [] def build_actor_network(self): """Build Actor Network.""" - state_input = layers.Input(shape=(5,)) # Assuming 5 sensor inputs + state_input = layers.Input(shape=(NUM_SENSORS,)) fc1 = layers.Dense( 64, activation="relu", @@ -90,7 +98,7 @@ def build_actor_network(self): def build_critic_network(self): """Build Critic Network""" - state_input = layers.Input(shape=(5,)) # Assuming 5 sensor inputs + state_input = layers.Input(shape=(NUM_SENSORS,)) fc1 = layers.Dense( 64, activation="relu", @@ -116,6 +124,115 @@ def build_critic_network(self): return keras.models.Model(inputs=state_input, outputs=value) + def compute_critic_gradient(self, states, values, advantages): + """ optimize Critic Network weights. + + Parameters + ---------- + states: The saved states observed during interactions with the environment. + values: The saved estimated values of the observed states. + advantages: the computed advantage values for each state in a given Data size. + """ + + with tf.GradientTape() as tape: + + # The critical value represents the expected return from state 𝑠𝑡. + # It provides an estimate of how good it is to be in a given state. + critic_value = self.critic_network(states) + + # the total discounted reward accumulated from time step 𝑡 + estimate_returns = advantages + values + + # Generate loss + critic_loss = tf.math.reduce_mean( + tf.math.pow(estimate_returns - critic_value, 2) + ) + # calculate gradient + critic_params = self.critic_network.trainable_variables + critic_grads = tape.gradient(critic_loss, critic_params) + self.critic_optimizer.apply_gradients( + zip(critic_grads, critic_params) + ) + + # save the critic Loss + self.critic_loss_history.append(critic_loss.numpy()) + + def calculate_adjusted_log_probability(self, states, actions): + """ The function computes the logarithmic probability of a given action. + + Parameters + ---------- + states: The saved states observed during interactions with the environment. + values: The saved estimated values of the observed states. + + Returns + ---------- + A TensorFlow tensor with the data type float32: The logarithmic probability + of a given action + + """ + # Forward pass through the actor network to get the action mean + predict_mean = self.actor_network(states) + + # Create the normal distribution with the predicted mean + new_dist = tfp.distributions.Normal(predict_mean, self.std_dev) + + # Invert the tanh transformation to recover the original actions before tanh + untransformed_actions = tf.atanh(actions) + + new_log_prob = new_dist.log_prob(untransformed_actions) + + # Compute the log of the Jacobian for the tanh transformation + # adding 1e-6 ensures that the Value remains stable and avoids potential issues + # during computation + jacobian_log_det = tf.math.log(1 - tf.square(actions) + 1e-6) + + adjusted_log_prob = new_log_prob - jacobian_log_det + + return adjusted_log_prob + + def compute_actor_gradient(self, states, actions, old_probs, advantages): + """ optimize Actor Network weights. + + Parameters + ---------- + states: The saved states observed during interactions with the environment. + actions: The saved actions taken in response to the observed states. + old_probs: The saved probabilities of the actions taken, based on the previous policy. + values: The saved estimated values of the observed states. + advantages: the computed advantage values for each state in a given Data size. + """ + + with tf.GradientTape() as tape: + + adjusted_new_log_prob = self.calculate_adjusted_log_probability(states, actions) + + # The ratio between the new model and the old model’s action log probabilities + prob_ratio = tf.exp(adjusted_new_log_prob - old_probs) + + # If the ratio is too large or too small, it will be + # clipped according to the surrogate function. + weighted_probs = prob_ratio * advantages + clipped_probs = tf.clip_by_value( + prob_ratio, 1 - self.policy_clip, 1 + self.policy_clip + ) + weighted_clipped_probs = clipped_probs * advantages + + # Policy Gradient Loss + actor_loss = -tf.reduce_mean( + tf.minimum(weighted_probs, weighted_clipped_probs) + ) + + # calculate gradient + actor_params = self.actor_network.trainable_variables + actor_grads = tape.gradient(actor_loss, actor_params) + self.actor_optimizer.apply_gradients( + zip(actor_grads, actor_params) + ) + + # save the Actor Loss + self.actor_loss_history.append(actor_loss.numpy()) + ################################################################################ # Functions diff --git a/webots/controllers/RL_Supervisor/rl_supervisor.py b/webots/controllers/RL_Supervisor/rl_supervisor.py index b620956..e257474 100644 --- a/webots/controllers/RL_Supervisor/rl_supervisor.py +++ b/webots/controllers/RL_Supervisor/rl_supervisor.py @@ -68,6 +68,10 @@ SENSOR_ID_MOST_LEFT = 0 SENSOR_ID_MOST_RIGHT = 4 +IDLE = "IDLE_STATE" +READY = "READY_STATE" +TRAINING = "TRAINING_STATE" + # Path of saved models PATH = "models/" @@ -76,7 +80,7 @@ ################################################################################ -class RobotController: # pylint: disable=too-many-instance-attributes +class RobotController: """Class for data flow control logic.""" def __init__(self, smp_server, tick_size, agent): @@ -86,7 +90,6 @@ def __init__(self, smp_server, tick_size, agent): self.__no_line_detection_count = 0 self.__timestamp = 0 # Elapsed time since reset [ms] self.last_sensor_data = None - self.start_stop_line_detected = False self.steps = 0 def callback_status(self, payload: bytearray) -> None: @@ -102,6 +105,7 @@ def callback_line_sensors(self, payload: bytearray) -> None: sensor_data = struct.unpack("5H", payload) self.steps += 1 + is_start_stop_line_detected = False # Determine lost line condition if all(value == 0 for value in sensor_data): self.__no_line_detection_count += 1 @@ -111,19 +115,18 @@ def callback_line_sensors(self, payload: bytearray) -> None: # Detect start/stop line if ((sensor_data[SENSOR_ID_MOST_LEFT] >= LINE_SENSOR_ON_TRACK_MIN_VALUE) and (sensor_data[SENSOR_ID_MOST_RIGHT] >= LINE_SENSOR_ON_TRACK_MIN_VALUE)): - self.start_stop_line_detected = True + is_start_stop_line_detected = True # Detect Start/Stop Line before Finish Trajectories - if (self.start_stop_line_detected is True) and (self.steps < MIN_NUMBER_OF_STEPS): + if (is_start_stop_line_detected is True) and (self.steps < MIN_NUMBER_OF_STEPS): sensor_data = list(sensor_data) - sensor_data[SENSOR_ID_MOST_LEFT] = 0 + sensor_data[SENSOR_ID_MOST_LEFT] = 0 sensor_data[SENSOR_ID_MOST_RIGHT] = 0 - self.start_stop_line_detected = False + is_start_stop_line_detected = False # sequence stop criterion debounce no line detection and start/stop line detected - if self.__no_line_detection_count >= 30 or ( - (self.start_stop_line_detected is True) and (self.steps >= MIN_NUMBER_OF_STEPS) - ): + if ((self.__no_line_detection_count >= 30) or ((is_start_stop_line_detected is True) + and (self.steps >= MIN_NUMBER_OF_STEPS))): self.__agent.done = True self.__no_line_detection_count = 0 self.steps = 0 @@ -139,9 +142,8 @@ def callback_line_sensors(self, payload: bytearray) -> None: # Start storage The data after the second received sensor data if self.last_sensor_data is not None: - normalized_sensor_data = self.__agent.normalize_sensor_data(self.last_sensor_data) self.__agent.store_transition( - normalized_sensor_data, + self.last_sensor_data, self.__agent.action, self.__agent.adjusted_log_prob, self.__agent.value, @@ -151,7 +153,7 @@ def callback_line_sensors(self, payload: bytearray) -> None: self.last_sensor_data = sensor_data # Sends the motor speeds to the robot. - if self.__agent.done is False and self.__agent.state == "READY": + if self.__agent.done is False and self.__agent.state == READY: self.__agent.send_motor_speeds(sensor_data) def callback_mode(self, payload: bytearray) -> None: @@ -191,6 +193,20 @@ def process(self): # process new data (callbacks will be executed) self.__smp_server.process(self.__timestamp) + def manage_agent_cycle(self,robot_node): + """The function controls agent behavior""" + if self.__agent.state == READY: + self.__agent.update(robot_node) + + # Start the training + elif self.__agent.state == TRAINING: + self.last_sensor_data = None + self.__agent.perform_training() + + # save model + if (self.__agent.num_episodes > 1) and (self.__agent.num_episodes % 50 == 0): + self.__agent.save_models() + ################################################################################ # Functions @@ -198,7 +214,6 @@ def process(self): # pylint: disable=duplicate-code -# pylint: disable=too-many-branches # pylint: disable=too-many-statements def main_loop(): """Main loop: @@ -275,20 +290,7 @@ def main_loop(): while supervisor.step(timestep) != -1: controller.process() - if agent.state == "READY": - agent.update(robot_node) - - # Start the training - elif agent.state == "TRAINING": - supervisor.last_sensor_data = None - agent.perform_training() - - print(f"#{agent.num_episodes} actor loss: {agent.actor_loss_history[-1]:.4f}," - f"critic loss: {agent.critic_loss_history[-1]:.4f}") - - # save model - if (agent.num_episodes > 1) and (agent.num_episodes % 50 == 0): - agent.save_models() + controller.manage_agent_cycle(robot_node) # Resent any unsent Data if agent.unsent_data: