From 5002ecd1cd7fee4a9c23105a320b0dca9979910a Mon Sep 17 00:00:00 2001 From: Akram Date: Mon, 26 Aug 2024 10:52:19 +0200 Subject: [PATCH] Add comments and refactor functions from agent to trajectory_buffer class --- lib/APPReinforcementLearning/src/App.h | 2 + webots/controllers/RL_Supervisor/agent.py | 341 ++++++++---------- webots/controllers/RL_Supervisor/plotting.py | 5 + .../RL_Supervisor/rl_supervisor.py | 26 +- .../RL_Supervisor/trajectory_buffer.py | 147 +++++++- 5 files changed, 311 insertions(+), 210 deletions(-) diff --git a/lib/APPReinforcementLearning/src/App.h b/lib/APPReinforcementLearning/src/App.h index c8dca20..bd3c6a1 100644 --- a/lib/APPReinforcementLearning/src/App.h +++ b/lib/APPReinforcementLearning/src/App.h @@ -154,6 +154,8 @@ class App /** * Send line sensors data via SerialMuxProt. + * + * * @return true if data has been sent, false otherwise. */ bool sendLineSensorsData() const; diff --git a/webots/controllers/RL_Supervisor/agent.py b/webots/controllers/RL_Supervisor/agent.py index eafb037..6949414 100644 --- a/webots/controllers/RL_Supervisor/agent.py +++ b/webots/controllers/RL_Supervisor/agent.py @@ -33,7 +33,7 @@ import numpy as np # pylint: disable=import-error import tensorflow as tf # pylint: disable=import-error import tensorflow_probability as tfp # pylint: disable=import-error -from trajectory_buffer import Memory +from trajectory_buffer import Buffer from networks import Networks ################################################################################ @@ -66,7 +66,7 @@ ] MAX_SENSOR_VALUE = 1000 MIN_STD_DEV = 0.1 # Minimum standard deviation -STD_DEV_FACTOR = 0.99995 # Discounter standard deviation factor +STD_DEV_FACTOR = 0.9995 # Discounter standard deviation factor ################################################################################ # Classes @@ -88,21 +88,20 @@ def __init__( batch_size=64, chkpt_dir="models/", top_speed=2000, - max_buffer_length=600, + max_buffer_length=65536, ): self.__serialmux = smp_server - self.__gamma = gamma self.__alpha = alpha - self.__gae_lambda = gae_lambda self.__policy_clip = policy_clip self.__chkpt_dir = chkpt_dir self.train_mode = False self.__top_speed = top_speed - self.__memory = Memory(batch_size, max_buffer_length) + self.__buffer = Buffer(batch_size, max_buffer_length, gamma, gae_lambda) self.__neural_network = Networks(self.__alpha) self.__training_index = 0 # Track batch index during training self.__current_batch = None # Saving of the current batch which is in process self.__std_dev = 0.9 + self.n_epochs = 10 self.done = False self.action = None self.value = None @@ -127,11 +126,20 @@ def set_drive_mode(self): self.num_episodes = 0 def store_transition( - self, state, action, probs, vals, reward, done + self, state, action, probs, value, reward, done ): # pylint: disable=too-many-arguments - """Store transitions in the replay buffer.""" + """Store transitions in the replay buffer. - self.__memory.store_memory(state, action, probs, vals, reward, done) + Parameters + ---------- + state: The state observed. + action: The action taken. + prob: The probability of taking the action. + value: The estimated value of the state. + reward: The reward received. + done: Indicating whether the target sequence has been reached. + """ + self.__buffer.store_memory(state, action, probs, value, reward, done) def save_models(self): """Saves the models in the specified file.""" @@ -150,13 +158,24 @@ def load_models(self): ) def predict_action(self, state): - """Predicts an action based on the current state.""" + """ + Predicts an action based on the current state. - # Conversion of the state into a tensor + Parameters + ---------- + state: The state observed. + + Returns + ---------- + float32: The action taken. + """ + # scales the sensor data to a range between 0 and 1 m_state = self.normalize_sensor_data(state) + + # Conversion of the state into a tensor state = tf.convert_to_tensor([m_state], dtype=tf.float32) - # output from the Actor Network. + # Forward pass through the actor network to get the action mean action_mean = self.__neural_network.actor_network(state) # Training mode is set. @@ -194,17 +213,24 @@ def predict_action(self, state): return self.action def send_motor_speeds(self, state): - """Sends the motor speeds to the robot.""" + """ + Sends the motor speeds to the robot. + Parameters + ---------- + state: The state observed. + """ + # pre_action contains the predicted action for the given state, calculated based + # on the Actor model output. pre_action = self.predict_action(state) # Get motor speed difference - speeddifference = self.__top_speed * pre_action + speed_difference = self.__top_speed * pre_action # Get individual motor speeds. The sign of speedDifference # determines if the robot turns left or right. - left_motor_speed = int(self.__top_speed - speeddifference) - right_motor_speed = int(self.__top_speed + speeddifference) + left_motor_speed = int(self.__top_speed - speed_difference) + right_motor_speed = int(self.__top_speed + speed_difference) control_data = struct.pack("2H", left_motor_speed, right_motor_speed) self.data_sent = self.__serialmux.send_data("SPEED_SET", control_data) @@ -214,26 +240,29 @@ def send_motor_speeds(self, state): self.unsent_data.append(("SPEED_SET", control_data)) def update(self, robot_node): - """Checks if the sequence has ended, performs updates, and saves the models.""" + """ + Checks if the sequence has ended and performs updates. + + Parameters + ---------- + robot_node: The Robot interface + """ # Checks whether the sequence has ended if it is set to Training mode. if (self.train_mode is True) and ( - (self.done is True) or (self.__memory.is_memory_full() is True) + (self.done is True) or (self.__buffer.is_memory_full() is True) ): cmd_payload = struct.pack("B", CMD_ID_SET_TRAINING_STATE) self.data_sent = self.__serialmux.send_data("CMD", cmd_payload) - # Failed to send data. Appends the data to unsent_data List + # Failed to send data. Appends the data to unsent_data List. if self.data_sent is False: self.unsent_data.append(("CMD", cmd_payload)) + # Stopping condition for sequence was reached. self.reinitialize(robot_node) self.state = "TRAINING" - # save models - if self.__memory.get_sum_rewards() >= 1000.0: - self.save_models() - # Checks whether the sequence has ended if it is set to driving mode. if (self.train_mode is False) and (self.done is True): self.done = False @@ -255,205 +284,131 @@ def update(self, robot_node): self.unsent_data.append(("CMD", cmd_payload)) def normalize_sensor_data(self, sensor_data): - """The normalize_sensor_data function scales the sensor data to a range between 0 and 1.""" - - sensor_data = np.array(sensor_data) / MAX_SENSOR_VALUE - return sensor_data - - def calculate_position(self, sensor_data): """ - Determines the deviation and returns an estimated position of the robot - with respect to a line. The estimate is made using a weighted average of - the sensor indices multiplied by 1000, so that a return value of 0 - indicates that the line is directly below sensor 0, a return value of - 1000 indicates that the line is directly below sensor 1, 2000 - indicates that it's below sensor 2000, etc. Intermediate values - indicate that the line is between two sensors. The formula is: - - 0*value0 + 1000*value1 + 2000*value2 + ... - -------------------------------------------- - value0 + value1 + value2 + ... - - This function assumes a dark line (high values) surrounded by white - (low values). - + The normalize_sensor_data function scales the sensor data to a range between 0 and 1. + Parameters ---------- - sensor_data : List + sensor_data: The state observed. Returns ---------- - Estimated position with respect to track. + NumPy array of float32: Normalized Sensor Data """ - estimated_pos = 0 - numerator = 0 - denominator = 0 - weight = 1000 - is_on_line = False - - max_sensors = len(sensor_data) - sensor_max_value = 1000 - last_pos_value = 0 - - for idx, sensor_value in enumerate(sensor_data): - numerator += idx * weight * sensor_value - denominator += sensor_value - - # Keep track of whether we see the line at all. - if LINE_SENSOR_ON_TRACK_MIN_VALUE < sensor_value: - is_on_line = True - - if False is is_on_line: - - # If it last read to the left of center, return 0. - if last_pos_value < ((max_sensors - 1) * sensor_max_value) / 2: - estimated_pos = 0 - - # If it last read to the right of center, return the max. value. - else: - estimated_pos = (max_sensors - 1) * sensor_max_value - - else: - # Check to avoid division by zero.. - if denominator != 0: - estimated_pos = numerator / denominator - - last_pos_value = estimated_pos - - return estimated_pos + sensor_data = np.array(sensor_data) / MAX_SENSOR_VALUE + return sensor_data - def calculate_reward(self, sensor_data): + def determine_reward(self, sensor_data): """ - The calculate_reward function evaluates the consequences of a certain + The function evaluates the consequences of a certain action performed in a certain state by calculating the resulting reward. A reward of 1 means that the robot is in the center of the Line. - """ - estimated_pos = self.calculate_position(sensor_data) - # Reward scaled between 0 and 1 If robot is in line. - if 500 <= estimated_pos <= 2000: - reward = (((1 / 150) * estimated_pos) - (10 / 3)) / 10 - return reward + Parameters + ---------- + sensor_data : The state observed. - # Reward scaled between 1 and 0 If robot is in line. - if 2000 < estimated_pos <= 3500: - reward = (((-1 / 150) * estimated_pos) + (70 / 3))/10 - return reward + Returns + ---------- + float: the Resulting Reward - return 0 + """ + reward = self.__buffer.calculate_reward(sensor_data) + return reward - def calculate_advantages(self, rewards, values, dones): - """Calculate advantages for each state in a mini-batch.""" + # pylint: disable=too-many-arguments + # pylint: disable=too-many-locals + def learn(self, states, actions, old_probs, values, rewards, dones): + """ + Perform training to optimize model weights. - mini_batch_size = len(rewards) + Parameters + ---------- + states: The saved states observed during interactions with the environment. + actions: The saved actions taken in response to the observed states. + old_probs: The saved probabilities of the actions taken, based on the previous policy. + values: The saved estimated values of the observed states. + rewards: The saved rewards received for taking the actions. + dones: The saved flags indicating whether the target sequence or episode has + been completed. + """ + for _ in range(self.n_epochs): - # Create empty advantages array - advantages = np.zeros(mini_batch_size, dtype=np.float32) + #the computed advantage values for each state in a given batch of experiences. + advantages = self.__buffer.calculate_advantages(rewards, values, dones) - for start_index in range(mini_batch_size): - discount = 1 - advantage = 0 + # optimize Actor Network weights + with tf.GradientTape() as tape: + states = tf.convert_to_tensor(states) + actions = tf.convert_to_tensor(actions) + old_probs = tf.convert_to_tensor(old_probs) - for future_index in range(start_index, mini_batch_size - 1): + # Forward pass through the actor network to get the action mean + predict_mean = self.__neural_network.actor_network(states) - # Calculate the temporal difference (TD) - delta = ( - rewards[future_index] - + ( - self.__gamma - * values[future_index + 1] - * (1 - int(dones[future_index])) - ) - - values[future_index] - ) + # Create the normal distribution with the predicted mean + new_dist = tfp.distributions.Normal(predict_mean, self.__std_dev) - # Accumulate the advantage using the discount factor - advantage += discount * delta + # Invert the tanh transformation to recover the original actions before tanh + untransformed_actions = tf.atanh(actions) - # Update the discount factor for the next step - discount *= self.__gamma * self.__gae_lambda + new_log_prob = new_dist.log_prob(untransformed_actions) - # Stop if a terminal state is reached - if dones[future_index]: - break + # Compute the log of the Jacobian for the tanh transformation + jacobian_log_det = tf.math.log(1 - tf.square(actions)) + adjusted_new_log_prob = new_log_prob - jacobian_log_det - # Save the calculated advantage for the current state - advantages[start_index] = advantage + # The ratio between the new model and the old model’s action log probabilities + prob_ratio = tf.exp(adjusted_new_log_prob - old_probs) - return advantages + # If the ratio is too large or too small, it will be + # clipped according to the surrogate function. + weighted_probs = prob_ratio * advantages + clipped_probs = tf.clip_by_value( + prob_ratio, 1 - self.__policy_clip, 1 + self.__policy_clip + ) + weighted_clipped_probs = clipped_probs * advantages - # pylint: disable=too-many-arguments - # pylint: disable=too-many-locals - def learn(self, states, actions, old_probs, values, rewards, dones): - """Perform training to optimize model weights""" - - advantages = self.calculate_advantages(rewards, values, dones) - - # optimize Actor Network weights - with tf.GradientTape() as tape: - states = tf.convert_to_tensor(states) - actions = tf.convert_to_tensor(actions) - old_probs = tf.convert_to_tensor(old_probs) - - # Predicts an action based on the Saved states - predict = self.__neural_network.actor_network(states) - new_dist = tfp.distributions.Normal(predict, 0.1) - new_log_prob = new_dist.log_prob(actions) - transformed_action = tf.tanh(actions) - jacobian_log_det = tf.math.log(1 - tf.square(transformed_action)) - adjusted_new_log_prob = new_log_prob - jacobian_log_det - - # The ratio between the new model and the old model’s action log probabilities - prob_ratio = tf.exp(adjusted_new_log_prob - old_probs) - - # If the ratio is too large or too small, it will be - # clipped according to the surrogate function. - weighted_probs = prob_ratio * advantages - clipped_probs = tf.clip_by_value( - prob_ratio, 1 - self.__policy_clip, 1 + self.__policy_clip - ) - weighted_clipped_probs = clipped_probs * advantages + # Policy Gradient Loss + actor_loss = -tf.reduce_mean( + tf.minimum(weighted_probs, weighted_clipped_probs) + ) - # Policy Gradient Loss - actor_loss = -tf.reduce_mean( - tf.minimum(weighted_probs, weighted_clipped_probs) + # calculate gradient + actor_params = self.__neural_network.actor_network.trainable_variables + actor_grads = tape.gradient(actor_loss, actor_params) + self.__neural_network.actor_optimizer.apply_gradients( + zip(actor_grads, actor_params) ) - # calculate gradient - actor_params = self.__neural_network.actor_network.trainable_variables - actor_grads = tape.gradient(actor_loss, actor_params) - self.__neural_network.actor_optimizer.apply_gradients( - zip(actor_grads, actor_params) - ) - - # optimize Critic Network weights - with tf.GradientTape() as tape: + # optimize Critic Network weights + with tf.GradientTape() as tape: - # The critical value represents the expected return from state 𝑠𝑡. - # It provides an estimate of how good it is to be in a given state. - critic_value = self.__neural_network.critic_network(states) + # The critical value represents the expected return from state 𝑠𝑡. + # It provides an estimate of how good it is to be in a given state. + critic_value = self.__neural_network.critic_network(states) - # the total discounted reward accumulated from time step 𝑡 - estimate_returns = advantages + values + # the total discounted reward accumulated from time step 𝑡 + estimate_returns = advantages + values - # Generate loss - critic_loss = tf.math.reduce_mean( - tf.math.pow(estimate_returns - critic_value, 2) - ) + # Generate loss + critic_loss = tf.math.reduce_mean( + tf.math.pow(estimate_returns - critic_value, 2) + ) - # calculate gradient - critic_params = self.__neural_network.critic_network.trainable_variables - critic_grads = tape.gradient(critic_loss, critic_params) - self.__neural_network.critic_optimizer.apply_gradients( - zip(critic_grads, critic_params) - ) - self.actor_loss_history.append(actor_loss.numpy()) - self.critic_loss_history.append(critic_loss.numpy()) - self.reward_history.append(sum(rewards)) + # calculate gradient + critic_params = self.__neural_network.critic_network.trainable_variables + critic_grads = tape.gradient(critic_loss, critic_params) + self.__neural_network.critic_optimizer.apply_gradients( + zip(critic_grads, critic_params) + ) + self.actor_loss_history.append(actor_loss.numpy()) + self.critic_loss_history.append(critic_loss.numpy()) + self.reward_history.append(sum(rewards)) - # saving logs in a CSV file - self.save_logs_to_csv() + # saving logs in a CSV file + self.save_logs_to_csv() def save_logs_to_csv(self): """Function for saving logs in a CSV file""" @@ -481,7 +436,7 @@ def perform_training(self): if self.__current_batch is None: # Grab sample from memory - self.__current_batch = self.__memory.generate_batches() + self.__current_batch = self.__buffer.generate_batches() # Perform training with mini batches. if self.__training_index < len(self.__current_batch[-1]): @@ -511,7 +466,7 @@ def perform_training(self): self.__training_index = 0 self.__current_batch = None self.done = False - self.__memory.clear_memory() + self.__buffer.clear_memory() self.state = "IDLE" self.num_episodes += 1 cmd_payload = struct.pack("B", CMD_ID_SET_READY_STATE) @@ -526,7 +481,13 @@ def perform_training(self): self.__std_dev = max(self.__std_dev, MIN_STD_DEV) def reinitialize(self, robot_node): - """Re-initialization of position and orientation of The ROBOT.""" + """ + Re-initialization of position and orientation of The ROBOT. + + Parameters + ---------- + robot_node: The Robot interface + """ trans_field = robot_node.getField("translation") rot_field = robot_node.getField("rotation") initial_position = POSITION_DATA diff --git a/webots/controllers/RL_Supervisor/plotting.py b/webots/controllers/RL_Supervisor/plotting.py index 4b20bf6..8efb92b 100644 --- a/webots/controllers/RL_Supervisor/plotting.py +++ b/webots/controllers/RL_Supervisor/plotting.py @@ -1,5 +1,10 @@ """ Plotting script with Matplotlib """ +# Description: This script generates three separate plots using Matplotlib to visualize +# the Actor Loss, Critic Loss, and Reward over time based on data collected during the +# training process. Each plot represents the respective metric as a function of +# mini-batch steps. + # Imports import matplotlib.pyplot as plt # pylint: disable=import-error import pandas as pd # pylint: disable=import-error diff --git a/webots/controllers/RL_Supervisor/rl_supervisor.py b/webots/controllers/RL_Supervisor/rl_supervisor.py index 1e13460..8ae0838 100644 --- a/webots/controllers/RL_Supervisor/rl_supervisor.py +++ b/webots/controllers/RL_Supervisor/rl_supervisor.py @@ -132,7 +132,7 @@ def callback_line_sensors(self, payload: bytearray) -> None: if self.__no_line_detection_count > 0: reward = -1 else: - reward = self.__agent.calculate_reward(sensor_data) + reward = self.__agent.determine_reward(sensor_data) # Start storage The data after the second received sensor data if self.last_sensor_data is not None: @@ -272,17 +272,23 @@ def main_loop(): while supervisor.step(timestep) != -1: controller.process() - # Checks if the sequence has ended - if agent.state == "READY": - agent.update(robot_node) + match (agent.state): + case "READY": + agent.update(robot_node) - # Start the training - elif agent.state == "TRAINING": - controller.steps = 0 - agent.perform_training() + case "TRAINING": + controller.steps = 0 + agent.perform_training() + print(f"#{agent.num_episodes} actor loss: {agent.actor_loss_history[-1]:.4f}," + f"critic loss: {agent.critic_loss_history[-1]:.4f}") - if 1000 <= agent.num_episodes: - print(f"The number of episodes:{agent.num_episodes}") + # save model + if (agent.num_episodes > 1) and (agent.num_episodes % 50 == 0): + print(f"The number of episodes:{agent.num_episodes}") + agent.save_models() + + case "IDLE": + pass # Resent any unsent Data if agent.unsent_data: diff --git a/webots/controllers/RL_Supervisor/trajectory_buffer.py b/webots/controllers/RL_Supervisor/trajectory_buffer.py index 01767d0..44f17fc 100644 --- a/webots/controllers/RL_Supervisor/trajectory_buffer.py +++ b/webots/controllers/RL_Supervisor/trajectory_buffer.py @@ -40,18 +40,23 @@ ################################################################################ -class Memory: # pylint: disable=too-many-instance-attributes +class Buffer: # pylint: disable=too-many-instance-attributes """Class for store and manage experience tuples during reinforcement learning""" - def __init__(self, batch_size, max_length): + # pylint: disable=too-many-arguments + def __init__(self, batch_size, max_length, gamma, gae_lambda): self.__states = [] self.__probs = [] self.__vals = [] self.__actions = [] self.__rewards = [] self.__dones = [] + self.__batch_size = batch_size self.__max_length = max_length self.__batch_size = batch_size + self.__gamma = gamma + self.__gae_lambda = gae_lambda + self.__current_index = 0 def generate_batches(self): """ @@ -76,11 +81,9 @@ def generate_batches(self): # Create indices for the states and mix them randomly indices = np.arange(n_states, dtype=np.int64) - np.random.shuffle(indices) # Create batches by dividing the indices into groups of the batch_size batches = [indices[indx : indx + self.__batch_size] for indx in batch_start] - print("Number of Steps:", n_states) return ( np.array(self.__states), @@ -93,7 +96,13 @@ def generate_batches(self): ) def get_sum_rewards(self) -> float: - """Calculate total rewards""" + """ + Calculate total rewards. + + Returns + ---------- + float: Total rewards received. + """ sum_rewards = sum(self.__rewards) return sum_rewards @@ -108,8 +117,8 @@ def store_memory( ---------- state: The state observed. action: The action taken. - prob: The probability of taking the action. - val: The estimated value of the state. + probs: The probability of taking the action. + vals: The estimated value of the state. reward: The reward received. done: Whether the episode is done. """ @@ -119,9 +128,10 @@ def store_memory( self.__vals.append(vals) self.__rewards.append(reward) self.__dones.append(done) + self.__current_index += 1 def clear_memory(self): - """Remove transitions from the replay buffer.""" + """Remove transitions from the trajektories buffer.""" self.__states = [] self.__probs = [] @@ -132,12 +142,129 @@ def clear_memory(self): def is_memory_full(self): """ - Checks whether Memory has reached its maximum capacity. + Checks whether Memory has reached its maximum capacity. Returns ---------- - Bool: Memory is full or not """ - is_full = len(self.__states) >= self.__max_length + is_full = self.__current_index >= self.__max_length return is_full + + def calculate_advantages(self, rewards, values, dones): + """ + The function measures how much better or worse an action + performed in a given state compared to the average action + the policy would take in that state. + + Parameters + ---------- + rewards: The rewards received. + values: The estimated values of the states. + dones: Indicating whether the target sequence has been reached. + + Returns + ---------- + NumPy array of float32: the computed advantage values for each + state in a given batch of experiences. + """ + + mini_batch_size = rewards.shape[0] + + # Create empty advantages array + advantages = np.zeros(mini_batch_size, dtype=np.float32) + + for start_index in range(mini_batch_size): + discount = 1 + advantage = 0 + + for future_index in range(start_index, mini_batch_size - 1): + + # Calculate the temporal difference (TD) + delta = ( + rewards[future_index] + + ( + self.__gamma + * values[future_index + 1] + * (1 - int(dones[future_index])) + ) + - values[future_index] + ) + + # Accumulate the advantage using the discount factor + advantage += discount * delta + + # Update the discount factor for the next step + discount *= self.__gamma * self.__gae_lambda + + # Stop if a terminal state is reached + if dones[future_index]: + break + + # Save the calculated advantage for the current state + advantages[start_index] = advantage + + return advantages + + def calculate_position(self, sensor_data): + """ + Determines the deviation and returns an estimated position of the robot + with respect to a line. The estimate is made using a weighted average of + the sensor indices multiplied by 1000, so that a return value of 0 + indicates that the line is directly below sensor 0, a return value of + 1000 indicates that the line is directly below sensor 1, 2000 + indicates that it's below sensor 2000, etc. Intermediate values + indicate that the line is between two sensors. The formula is: + + 0*value0 + 1000*value1 + 2000*value2 + ... + -------------------------------------------- + value0 + value1 + value2 + ... + + This function assumes a dark line (high values) surrounded by white + (low values). + + Parameters + ---------- + sensor_data : The state observed. + + Returns + ---------- + float: Estimated position with respect to track. + """ + estimated_pos = 0.0 + numerator = 0.0 + denominator = 0.0 + weight = 1000.0 + + for idx, sensor_value in enumerate(sensor_data): + numerator += idx * weight * sensor_value + denominator += sensor_value + + if denominator > 0: + estimated_pos = numerator / denominator + + return estimated_pos + + def calculate_reward(self, sensor_data): + """ + The calculate_reward function evaluates the consequences of a certain + action performed in a certain state by calculating the resulting reward. + A reward of 1 means that the robot is in the center of the Line. + + Parameters + ---------- + sensor_data : The state observed. + + Returns + ---------- + float: the Resulting Reward + """ + reward = 0.0 + estimated_pos = self.calculate_position(sensor_data) + + # Reward scaled between 0 and 1 based on the estimated position + if 500 <= estimated_pos <= 3500: + reward = 1.0 - abs(estimated_pos - 2000.0) / 1500.0 + + return reward