From e136bf7826f63cf2a402af69a1200f6dd8d52168 Mon Sep 17 00:00:00 2001 From: Akram Date: Tue, 30 Jul 2024 15:11:05 +0200 Subject: [PATCH] implement new class-based architecture --- .../RL_Supervisor/rl_supervisor.py | 270 ++++++++++-------- 1 file changed, 152 insertions(+), 118 deletions(-) diff --git a/webots/controllers/RL_Supervisor/rl_supervisor.py b/webots/controllers/RL_Supervisor/rl_supervisor.py index b1dfc0fe..4c9e14e3 100644 --- a/webots/controllers/RL_Supervisor/rl_supervisor.py +++ b/webots/controllers/RL_Supervisor/rl_supervisor.py @@ -14,15 +14,21 @@ ROBOT_NAME = "ROBOT" SUPERVISOR_RX_NAME = "serialComRx" SUPERVISOR_TX_NAME = "serialComTx" + COMMAND_CHANNEL_NAME = "CMD" CMD_DLC = 1 -SPEED_SETPOINT_CHANNEL_NAME = "SPEED_SET" + +SPEED_SET_CHANNEL_NAME = "SPEED_SET" SPEED_SET_DLC = 4 + LINE_SENSOR_CHANNEL_NAME = "LINE_SENS" +LINE_SENSOR_ON_TRACK_MIN_VALUE = 200 + STATUS_CHANNEL_NAME = "STATUS" +STATUS_CHANNEL_ERROR_VAL = 1 + MODE_CHANNEL_NAME = "MODE" CMD_ID_SET_READY_STATE = 1 -LINE_SENSOR_ON_TRACK_MIN_VALUE = 200 POSITION_DATA = [-0.24713614078815466, -0.04863962992854465, 0.013994298332013683] ORIENTATION_DATA = [ -1.0564747468923541e-06, @@ -31,149 +37,177 @@ 1.5880805820884731, ] -# Counter for the number of consecutive times a line has not been detected. -no_line_detection_count = [0] -# The counter of the number of times it has been reset. -reset_count = [0] +class RobotController: + """Class for data flow control logic""" + + def __init__(self, robot_node, smp_server, tick_size): + self.__robot_node = robot_node + self.__smp_server = smp_server + self.__tick_size = tick_size + self.__no_line_detection_count = 0 + self.__timestamp = 0 # Elapsed time since reset [ms] + self.is_sequence_complete = False + + def reinitialize(self): + """Re-initialization of position and orientation of The ROBOT.""" + trans_field = self.__robot_node.getField("translation") + rot_field = self.__robot_node.getField("rotation") + initial_position = POSITION_DATA + initial_orientation = ORIENTATION_DATA + trans_field.setSFVec3f(initial_position) + rot_field.setSFRotation(initial_orientation) + + def callback_status(self, payload: bytearray) -> int: + """Callback Status Channel.""" + + # perform action on robot status feedback + if payload[0] == STATUS_CHANNEL_ERROR_VAL: + print("robot has reached error-state (max. lap time passed in robot)") + + # stop robot motors + self.__smp_server.send_data(SPEED_SET_CHANNEL_NAME, struct.pack("2H", 0, 0)) + + # switch robot to ready state + self.__smp_server.send_data( + COMMAND_CHANNEL_NAME, struct.pack("B", CMD_ID_SET_READY_STATE) + ) + + # perform robot state reset + self.is_sequence_complete = True + + def callback_line_sensors(self, payload: bytearray) -> None: + """Callback LINE_SENS Channel.""" + sensor_data = struct.unpack("5H", payload) + + for idx in range(5): + if idx == 4: + print(f"Sensor[{idx}] = {sensor_data[idx]}") + else: + print(f"Sensor[{idx}] = {sensor_data[idx]}, ", end="") + + # determine lost line condition + if all(value == 0 for value in sensor_data): + self.__no_line_detection_count += 1 + else: + self.__no_line_detection_count = 0 -# Create the Supervisor instance. -supervisor = Supervisor() + # detect start/stop line + is_start_stop = all( + value >= LINE_SENSOR_ON_TRACK_MIN_VALUE for value in sensor_data + ) -# Get the time step of the current world. -timestep = int(supervisor.getBasicTimeStep()) + # sequence stop criterion debounce no line detection and + if (self.__no_line_detection_count >= 20) or is_start_stop: + # Stop motors, maximum NO Line Detection Counter reached + self.__smp_server.send_data(SPEED_SET_CHANNEL_NAME, struct.pack("2H", 0, 0)) -# Get the Supervisor Receiver Device -supervisor_com_rx = supervisor.getDevice(SUPERVISOR_RX_NAME) + # SENDING A COMMAND ID SET READY STATUS + self.__smp_server.send_data( + COMMAND_CHANNEL_NAME, struct.pack("B", CMD_ID_SET_READY_STATE) + ) + self.is_sequence_complete = True -# Get the Supervisor Emitter device -supervisor_com_tx = supervisor.getDevice(SUPERVISOR_TX_NAME) + else: + # [PLACEHOLDER] replace with line sensor data receiver -# Set serial rx/tx channels for communication with the Roboter. -supervisor_com_rx.setChannel(2) -supervisor_com_tx.setChannel(1) + # Set left and right motors speed to 1000 + self.__smp_server.send_data( + SPEED_SET_CHANNEL_NAME, struct.pack("2H", 1000, 1000) + ) -# Get robot node which to observe. -robot_node = supervisor.getFromDef(ROBOT_NAME) + def callback_mode(self, payload: bytearray) -> None: + """Callback MODE Channel.""" + driving_mode = payload[0] -# SerialMuxProt Server Instance. -s_client = SerialWebots(supervisor_com_tx, supervisor_com_rx) -smp_server = Server(10, s_client) + if driving_mode: + print("Driving Mode Selected.") + else: + print("Train Mode Selected.") -# Channel creation. -serialMuxProtChannelIdSPEED_SET = smp_server.create_channel( - SPEED_SETPOINT_CHANNEL_NAME, SPEED_SET_DLC -) -serialMuxProtChannelIdCMD = smp_server.create_channel(COMMAND_CHANNEL_NAME, CMD_DLC) + def process(self): + """function performing controller step""" + self.__timestamp += self.__tick_size -# Enable supervisor receiver to receive any message from the robot or other -# devices in the simulation. -if supervisor_com_rx is None: - print( - f"No supervisor communication possible, because {SUPERVISOR_RX_NAME} not found." - ) -else: - supervisor_com_rx.enable(timestep) - - -def reinitialize(): - """Re-initialization of position and orientation of The ROBOT.""" - trans_field = robot_node.getField("translation") - rot_field = robot_node.getField("rotation") - intial_position = POSITION_DATA - initial_orientation = ORIENTATION_DATA - trans_field.setSFVec3f(intial_position) - rot_field.setSFRotation(initial_orientation) - reset_count[0] = 0 - - -def callback_status(payload: bytearray) -> int: - """Callback Status Channel.""" - if payload[0] == 1: - print("the max. time within the robot must be finished its drive is Done") - smp_server.send_data( - SPEED_SETPOINT_CHANNEL_NAME, struct.pack("2H", 0, 0) - ) # Stop motors - smp_server.send_data( - COMMAND_CHANNEL_NAME, struct.pack("B", CMD_ID_SET_READY_STATE) - ) # SENDING A COMMAND_ID SET READY STATUS - reinitialize() - - -def callback_line_sensors(payload: bytearray) -> None: - """Callback LINE_SENS Channel.""" - # sensor_data now contains a tuple of 5 unsigned short integers extracted from the payload - sensor_data = struct.unpack("5H", payload) - - for idx in range(5): - if idx == 4: - print(f"Sensor[{idx}] = {sensor_data[idx]}") - else: - print(f"Sensor[{idx}] = {sensor_data[idx]}, ", end="") + # process new data (callbacks will be executed) + self.__smp_server.process(self.__timestamp) - if all(value == 0 for value in sensor_data): - no_line_detection_count[0] += 1 - else: - no_line_detection_count[0] = 0 - if no_line_detection_count[0] >= 20 or all( - value >= LINE_SENSOR_ON_TRACK_MIN_VALUE for value in sensor_data - ): +def main_loop(): + """Main loop: + - Perform simulation steps until Webots is stopping the controller. - # Stop motors, maximum NO Line Detection Counter reached - smp_server.send_data(SPEED_SETPOINT_CHANNEL_NAME, struct.pack("2H", 0, 0)) + Returns: + number: Status + """ + status = 0 - # SENDING A COMMAND ID SET READY STATUS - smp_server.send_data( - COMMAND_CHANNEL_NAME, struct.pack("B", CMD_ID_SET_READY_STATE) - ) - reset_count[0] += 1 + supervisor = Supervisor() + timestep = int(supervisor.getBasicTimeStep()) + # get serial receiver from supervisor + supervisor_com_rx = supervisor.getDevice(SUPERVISOR_RX_NAME) + if supervisor_com_rx is None: + print(f"ERROR: {SUPERVISOR_RX_NAME} not found.") + status = -1 + else: + supervisor_com_rx.enable(timestep) + supervisor_com_rx.setChannel(2) + + # get serial emitter from supervisor + supervisor_com_tx = supervisor.getDevice(SUPERVISOR_TX_NAME) + if supervisor_com_tx is None: + print(f"ERROR: {SUPERVISOR_TX_NAME} not found.") + status = -1 else: - # Set left and right motors speed to 1000 - smp_server.send_data(SPEED_SETPOINT_CHANNEL_NAME, struct.pack("2H", 1000, 1000)) + # supervisor_com_tx.enable(timestep) + supervisor_com_tx.setChannel(1) + # get robot defition + robot_node = supervisor.getFromDef(ROBOT_NAME) + if robot_node is None: + print(f"ERROR: {ROBOT_NAME} not found.") + status = -1 -def callback_mode(payload: bytearray) -> None: - """Callback MODE Channel.""" - driving_mode = payload[0] + # connect webots serial nodes to SerMuxProt + s_client = SerialWebots(supervisor_com_tx, supervisor_com_rx) + smp_server = Server(10, s_client) - if driving_mode: - print("Driving Mode Selected.") - else: - print("Train Mode Selected.") + sermux_channel_speed_set = smp_server.create_channel( + SPEED_SET_CHANNEL_NAME, SPEED_SET_DLC + ) + sermux_channel_cmd_id = smp_server.create_channel(COMMAND_CHANNEL_NAME, CMD_DLC) + if sermux_channel_speed_set == 0: + print("ERROR: channel SPEED_SET not created.") + status = -1 -def main_loop(): - """Main loop: - - Perform simulation steps until Webots is stopping the controller- + if sermux_channel_cmd_id == 0: + print("ERROR: channel CMD not created.") + status = -1 - Returns: - number: Status - """ - status = 0 - elapsed_time_since_reset = 0 # Elapsed time since reset [ms] + # create instance of robot logic class + controller = RobotController(robot_node, smp_server, timestep) - # Channel subscription. - smp_server.subscribe_to_channel(STATUS_CHANNEL_NAME, callback_status) - smp_server.subscribe_to_channel(LINE_SENSOR_CHANNEL_NAME, callback_line_sensors) - smp_server.subscribe_to_channel(MODE_CHANNEL_NAME, callback_mode) + smp_server.subscribe_to_channel(STATUS_CHANNEL_NAME, controller.callback_status) - if serialMuxProtChannelIdSPEED_SET == 0 or serialMuxProtChannelIdCMD == 0: - print("Channel SPEED_SET not created.") - else: + smp_server.subscribe_to_channel( + LINE_SENSOR_CHANNEL_NAME, controller.callback_line_sensors + ) - if robot_node is None: - print(f"Robot DEF {ROBOT_NAME} not found.") - status = -1 + smp_server.subscribe_to_channel(MODE_CHANNEL_NAME, controller.callback_mode) - else: - while supervisor.step(timestep) != -1: - elapsed_time_since_reset += timestep - smp_server.process(elapsed_time_since_reset) + # setup successful + if status != -1: + # simulation loop + while supervisor.step(timestep) != -1: + controller.process() + + # stopping condition for sequence was reached + if controller.is_sequence_complete is True: + controller.reinitialize() + controller.is_sequence_complete = False - if reset_count[0] != 0: - reinitialize() return status