From 0a5001cc8286c0654e84ca5bdf21ff1e5f2f9261 Mon Sep 17 00:00:00 2001 From: Akram Date: Mon, 19 Aug 2024 08:17:02 +0200 Subject: [PATCH] added comments, renamed variables, and updated values --- webots/controllers/RL_Supervisor/agent.py | 64 ++++++++++--------- webots/controllers/RL_Supervisor/networks.py | 27 ++++++-- .../RL_Supervisor/rl_supervisor.py | 17 +++-- 3 files changed, 70 insertions(+), 38 deletions(-) diff --git a/webots/controllers/RL_Supervisor/agent.py b/webots/controllers/RL_Supervisor/agent.py index 2d071d3..19fa550 100644 --- a/webots/controllers/RL_Supervisor/agent.py +++ b/webots/controllers/RL_Supervisor/agent.py @@ -102,7 +102,7 @@ def __init__( self.__neural_network = Networks(self.__alpha) self.__training_index = 0 # Track batch index during training self.__current_batch = None # Saving of the current batch which is in process - self.__std_dev = 1 + self.__std_dev = 0.9 self.done = False self.action = None self.value = None @@ -156,13 +156,14 @@ def predict_action(self, state): m_state = self.normalize_sensor_data(state) state = tf.convert_to_tensor([m_state], dtype=tf.float32) - # Calculation of probabilities by the Actor neural network - probs = self.__neural_network.actor_network(state) + # output from the Actor Network. + action_mean = self.__neural_network.actor_network(state) + # Training mode is set. if self.train_mode is True: - # Create a normal distribution with the calculated probabilities - # and the standard deviation - dist = tfp.distributions.Normal(probs, self.__std_dev) + + # Create a normal distribution + dist = tfp.distributions.Normal(action_mean, self.__std_dev) # Sampling an action from the normal distribution sampled_action = dist.sample() @@ -185,8 +186,10 @@ def predict_action(self, state): self.action = transformed_action.numpy()[0] self.value = value.numpy()[0] self.adjusted_log_prob = adjusted_log_prob.numpy()[0] + + # Driving mode is set else: - self.action = probs.numpy()[0] + self.action = action_mean.numpy()[0] return self.action @@ -240,6 +243,7 @@ def update(self, robot_node): self.data_sent = self.__serialmux.send_data( "SPEED_SET", motorcontrol ) # stop the motors immediately + # Failed to send data. Appends the data to unsent_data List if self.data_sent is False: self.unsent_data.append(("SPEED_SET", motorcontrol)) @@ -253,29 +257,9 @@ def update(self, robot_node): def normalize_sensor_data(self, sensor_data): """The normalize_sensor_data function scales the sensor data to a range between 0 and 1.""" - # Normalized sensor data sensor_data = np.array(sensor_data) / MAX_SENSOR_VALUE - return sensor_data - def calculate_reward(self, sensor_data): - """ - The calculate_reward function evaluates the consequences of a certain - action performed in a certain state by calculating the resulting reward - """ - estimated_pos = self.calculate_position(sensor_data) - - # Return reward between 0 and 10 - if 500 <= estimated_pos <= 2000: - reward = ((1 / 150) * estimated_pos) - (10 / 3) - return reward - - if 2000 < estimated_pos <= 3500: - reward = ((-1 / 150) * estimated_pos) + (70 / 3) - return reward - - return 0 - def calculate_position(self, sensor_data): """ Determines the deviation and returns an estimated position of the robot @@ -339,6 +323,26 @@ def calculate_position(self, sensor_data): return estimated_pos + def calculate_reward(self, sensor_data): + """ + The calculate_reward function evaluates the consequences of a certain + action performed in a certain state by calculating the resulting reward. + A reward of 1 means that the robot is in the center of the Line. + """ + estimated_pos = self.calculate_position(sensor_data) + + # Reward scaled between 0 and 1 If robot is in line. + if 500 <= estimated_pos <= 2000: + reward = (((1 / 150) * estimated_pos) - (10 / 3)) / 10 + return reward + + # Reward scaled between 1 and 0 If robot is in line. + if 2000 < estimated_pos <= 3500: + reward = (((-1 / 150) * estimated_pos) + (70 / 3))/10 + return reward + + return 0 + def calculate_advantages(self, rewards, values, dones): """Calculate advantages for each state in a mini-batch.""" @@ -386,7 +390,7 @@ def learn(self, states, actions, old_probs, values, rewards, dones): advantages = self.calculate_advantages(rewards, values, dones) - # optimize Actor Network weights + # optimize Actor Network weights with tf.GradientTape() as tape: states = tf.convert_to_tensor(states) actions = tf.convert_to_tensor(actions) @@ -426,7 +430,7 @@ def learn(self, states, actions, old_probs, values, rewards, dones): # optimize Critic Network weights with tf.GradientTape() as tape: - # The critical value represents the expected return from state 𝑠𝑡. + # The critical value represents the expected return from state 𝑠𝑡. # It provides an estimate of how good it is to be in a given state. critic_value = self.__neural_network.critic_network(states) @@ -479,7 +483,7 @@ def perform_training(self): # Grab sample from memory self.__current_batch = self.__memory.generate_batches() - # Perform training with mini batchtes. + # Perform training with mini batches. if self.__training_index < len(self.__current_batch[-1]): ( state_arr, diff --git a/webots/controllers/RL_Supervisor/networks.py b/webots/controllers/RL_Supervisor/networks.py index 84f9e98..f362459 100644 --- a/webots/controllers/RL_Supervisor/networks.py +++ b/webots/controllers/RL_Supervisor/networks.py @@ -30,6 +30,7 @@ import tensorflow as tf # pylint: disable=import-error from tensorflow.keras import layers # pylint: disable=import-error +from tensorflow.keras.regularizers import l2 # pylint: disable=import-error ################################################################################ # Variables @@ -59,22 +60,31 @@ def build_actor_network(self): 64, activation="relu", kernel_initializer="he_normal", + kernel_regularizer=l2(0.01), bias_initializer="zeros", )(state_input) fc2 = layers.Dense( 64, activation="relu", kernel_initializer="he_normal", + kernel_regularizer=l2(0.01), bias_initializer="zeros", )(fc1) - mu = layers.Dense( + fc3 = layers.Dense( + 32, + activation="relu", + kernel_initializer="he_normal", + kernel_regularizer=l2(0.01), + bias_initializer="zeros", + )(fc2) + mean = layers.Dense( 1, activation="tanh", kernel_initializer="glorot_uniform", bias_initializer="zeros", - )(fc2) + )(fc3) - return tf.keras.models.Model(inputs=state_input, outputs=mu) + return tf.keras.models.Model(inputs=state_input, outputs=mean) def build_critic_network(self): """Build Critic Network""" @@ -84,15 +94,24 @@ def build_critic_network(self): 64, activation="relu", kernel_initializer="he_normal", + kernel_regularizer=l2(0.01), bias_initializer="zeros", )(state_input) fc2 = layers.Dense( 64, activation="relu", kernel_initializer="he_normal", + kernel_regularizer=l2(0.01), bias_initializer="zeros", )(fc1) - value = layers.Dense(1)(fc2) # Value output + fc3 = layers.Dense( + 32, + activation="relu", + kernel_initializer="he_normal", + kernel_regularizer=l2(0.01), + bias_initializer="zeros", + )(fc2) + value = layers.Dense(1)(fc3) # Value output return tf.keras.models.Model(inputs=state_input, outputs=value) diff --git a/webots/controllers/RL_Supervisor/rl_supervisor.py b/webots/controllers/RL_Supervisor/rl_supervisor.py index fa7388a..1e13460 100644 --- a/webots/controllers/RL_Supervisor/rl_supervisor.py +++ b/webots/controllers/RL_Supervisor/rl_supervisor.py @@ -101,13 +101,14 @@ def callback_line_sensors(self, payload: bytearray) -> None: """Callback LINE_SENS Channel.""" sensor_data = struct.unpack("5H", payload) self.steps += 1 - # determine lost line condition + + # Determine lost line condition if all(value == 0 for value in sensor_data): self.__no_line_detection_count += 1 else: self.__no_line_detection_count = 0 - # detect start/stop line + # Detect start/stop line is_start_stop = all( value >= LINE_SENSOR_ON_TRACK_MIN_VALUE for value in sensor_data ) @@ -126,7 +127,12 @@ def callback_line_sensors(self, payload: bytearray) -> None: # The sequence of states and actions is stored in memory for the training phase. if self.__agent.train_mode: - reward = self.__agent.calculate_reward(sensor_data) + + # receive a -1 punishment if the robot leaves the line + if self.__no_line_detection_count > 0: + reward = -1 + else: + reward = self.__agent.calculate_reward(sensor_data) # Start storage The data after the second received sensor data if self.last_sensor_data is not None: @@ -159,6 +165,8 @@ def load_models(self, path) -> None: """Load Model if exist""" if os.path.exists(path): self.__agent.load_models() + else: + print("No model available") def retry_unsent_data(self, unsent_data: list) -> bool: """Resent any unsent Data""" @@ -259,6 +267,7 @@ def main_loop(): if status != -1: controller.load_models(PATH) + # simulation loop while supervisor.step(timestep) != -1: controller.process() @@ -273,7 +282,7 @@ def main_loop(): agent.perform_training() if 1000 <= agent.num_episodes: - print(f"Episodes: {agent.num_episodes}") + print(f"The number of episodes:{agent.num_episodes}") # Resent any unsent Data if agent.unsent_data: