Skip to content

Commit

Permalink
implement new class-based architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
Akram authored and Akram committed Jul 30, 2024
1 parent 65b3c6d commit e136bf7
Showing 1 changed file with 152 additions and 118 deletions.
270 changes: 152 additions & 118 deletions webots/controllers/RL_Supervisor/rl_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@
ROBOT_NAME = "ROBOT"
SUPERVISOR_RX_NAME = "serialComRx"
SUPERVISOR_TX_NAME = "serialComTx"

COMMAND_CHANNEL_NAME = "CMD"
CMD_DLC = 1
SPEED_SETPOINT_CHANNEL_NAME = "SPEED_SET"

SPEED_SET_CHANNEL_NAME = "SPEED_SET"
SPEED_SET_DLC = 4

LINE_SENSOR_CHANNEL_NAME = "LINE_SENS"
LINE_SENSOR_ON_TRACK_MIN_VALUE = 200

STATUS_CHANNEL_NAME = "STATUS"
STATUS_CHANNEL_ERROR_VAL = 1

MODE_CHANNEL_NAME = "MODE"
CMD_ID_SET_READY_STATE = 1
LINE_SENSOR_ON_TRACK_MIN_VALUE = 200
POSITION_DATA = [-0.24713614078815466, -0.04863962992854465, 0.013994298332013683]
ORIENTATION_DATA = [
-1.0564747468923541e-06,
Expand All @@ -31,149 +37,177 @@
1.5880805820884731,
]

# Counter for the number of consecutive times a line has not been detected.
no_line_detection_count = [0]

# The counter of the number of times it has been reset.
reset_count = [0]
class RobotController:
"""Class for data flow control logic"""

def __init__(self, robot_node, smp_server, tick_size):
self.__robot_node = robot_node
self.__smp_server = smp_server
self.__tick_size = tick_size
self.__no_line_detection_count = 0
self.__timestamp = 0 # Elapsed time since reset [ms]
self.is_sequence_complete = False

def reinitialize(self):
"""Re-initialization of position and orientation of The ROBOT."""
trans_field = self.__robot_node.getField("translation")
rot_field = self.__robot_node.getField("rotation")
initial_position = POSITION_DATA
initial_orientation = ORIENTATION_DATA
trans_field.setSFVec3f(initial_position)
rot_field.setSFRotation(initial_orientation)

def callback_status(self, payload: bytearray) -> int:
"""Callback Status Channel."""

# perform action on robot status feedback
if payload[0] == STATUS_CHANNEL_ERROR_VAL:
print("robot has reached error-state (max. lap time passed in robot)")

# stop robot motors
self.__smp_server.send_data(SPEED_SET_CHANNEL_NAME, struct.pack("2H", 0, 0))

# switch robot to ready state
self.__smp_server.send_data(
COMMAND_CHANNEL_NAME, struct.pack("B", CMD_ID_SET_READY_STATE)
)

# perform robot state reset
self.is_sequence_complete = True

def callback_line_sensors(self, payload: bytearray) -> None:
"""Callback LINE_SENS Channel."""
sensor_data = struct.unpack("5H", payload)

for idx in range(5):
if idx == 4:
print(f"Sensor[{idx}] = {sensor_data[idx]}")
else:
print(f"Sensor[{idx}] = {sensor_data[idx]}, ", end="")

# determine lost line condition
if all(value == 0 for value in sensor_data):
self.__no_line_detection_count += 1
else:
self.__no_line_detection_count = 0

# Create the Supervisor instance.
supervisor = Supervisor()
# detect start/stop line
is_start_stop = all(
value >= LINE_SENSOR_ON_TRACK_MIN_VALUE for value in sensor_data
)

# Get the time step of the current world.
timestep = int(supervisor.getBasicTimeStep())
# sequence stop criterion debounce no line detection and
if (self.__no_line_detection_count >= 20) or is_start_stop:
# Stop motors, maximum NO Line Detection Counter reached
self.__smp_server.send_data(SPEED_SET_CHANNEL_NAME, struct.pack("2H", 0, 0))

# Get the Supervisor Receiver Device
supervisor_com_rx = supervisor.getDevice(SUPERVISOR_RX_NAME)
# SENDING A COMMAND ID SET READY STATUS
self.__smp_server.send_data(
COMMAND_CHANNEL_NAME, struct.pack("B", CMD_ID_SET_READY_STATE)
)
self.is_sequence_complete = True

# Get the Supervisor Emitter device
supervisor_com_tx = supervisor.getDevice(SUPERVISOR_TX_NAME)
else:
# [PLACEHOLDER] replace with line sensor data receiver

# Set serial rx/tx channels for communication with the Roboter.
supervisor_com_rx.setChannel(2)
supervisor_com_tx.setChannel(1)
# Set left and right motors speed to 1000
self.__smp_server.send_data(
SPEED_SET_CHANNEL_NAME, struct.pack("2H", 1000, 1000)
)

# Get robot node which to observe.
robot_node = supervisor.getFromDef(ROBOT_NAME)
def callback_mode(self, payload: bytearray) -> None:
"""Callback MODE Channel."""
driving_mode = payload[0]

# SerialMuxProt Server Instance.
s_client = SerialWebots(supervisor_com_tx, supervisor_com_rx)
smp_server = Server(10, s_client)
if driving_mode:
print("Driving Mode Selected.")
else:
print("Train Mode Selected.")

# Channel creation.
serialMuxProtChannelIdSPEED_SET = smp_server.create_channel(
SPEED_SETPOINT_CHANNEL_NAME, SPEED_SET_DLC
)
serialMuxProtChannelIdCMD = smp_server.create_channel(COMMAND_CHANNEL_NAME, CMD_DLC)
def process(self):
"""function performing controller step"""
self.__timestamp += self.__tick_size

# Enable supervisor receiver to receive any message from the robot or other
# devices in the simulation.
if supervisor_com_rx is None:
print(
f"No supervisor communication possible, because {SUPERVISOR_RX_NAME} not found."
)
else:
supervisor_com_rx.enable(timestep)


def reinitialize():
"""Re-initialization of position and orientation of The ROBOT."""
trans_field = robot_node.getField("translation")
rot_field = robot_node.getField("rotation")
intial_position = POSITION_DATA
initial_orientation = ORIENTATION_DATA
trans_field.setSFVec3f(intial_position)
rot_field.setSFRotation(initial_orientation)
reset_count[0] = 0


def callback_status(payload: bytearray) -> int:
"""Callback Status Channel."""
if payload[0] == 1:
print("the max. time within the robot must be finished its drive is Done")
smp_server.send_data(
SPEED_SETPOINT_CHANNEL_NAME, struct.pack("2H", 0, 0)
) # Stop motors
smp_server.send_data(
COMMAND_CHANNEL_NAME, struct.pack("B", CMD_ID_SET_READY_STATE)
) # SENDING A COMMAND_ID SET READY STATUS
reinitialize()


def callback_line_sensors(payload: bytearray) -> None:
"""Callback LINE_SENS Channel."""
# sensor_data now contains a tuple of 5 unsigned short integers extracted from the payload
sensor_data = struct.unpack("5H", payload)

for idx in range(5):
if idx == 4:
print(f"Sensor[{idx}] = {sensor_data[idx]}")
else:
print(f"Sensor[{idx}] = {sensor_data[idx]}, ", end="")
# process new data (callbacks will be executed)
self.__smp_server.process(self.__timestamp)

if all(value == 0 for value in sensor_data):
no_line_detection_count[0] += 1
else:
no_line_detection_count[0] = 0

if no_line_detection_count[0] >= 20 or all(
value >= LINE_SENSOR_ON_TRACK_MIN_VALUE for value in sensor_data
):
def main_loop():
"""Main loop:
- Perform simulation steps until Webots is stopping the controller.
# Stop motors, maximum NO Line Detection Counter reached
smp_server.send_data(SPEED_SETPOINT_CHANNEL_NAME, struct.pack("2H", 0, 0))
Returns:
number: Status
"""
status = 0

# SENDING A COMMAND ID SET READY STATUS
smp_server.send_data(
COMMAND_CHANNEL_NAME, struct.pack("B", CMD_ID_SET_READY_STATE)
)
reset_count[0] += 1
supervisor = Supervisor()
timestep = int(supervisor.getBasicTimeStep())

# get serial receiver from supervisor
supervisor_com_rx = supervisor.getDevice(SUPERVISOR_RX_NAME)
if supervisor_com_rx is None:
print(f"ERROR: {SUPERVISOR_RX_NAME} not found.")
status = -1
else:
supervisor_com_rx.enable(timestep)
supervisor_com_rx.setChannel(2)

# get serial emitter from supervisor
supervisor_com_tx = supervisor.getDevice(SUPERVISOR_TX_NAME)
if supervisor_com_tx is None:
print(f"ERROR: {SUPERVISOR_TX_NAME} not found.")
status = -1
else:
# Set left and right motors speed to 1000
smp_server.send_data(SPEED_SETPOINT_CHANNEL_NAME, struct.pack("2H", 1000, 1000))
# supervisor_com_tx.enable(timestep)
supervisor_com_tx.setChannel(1)

# get robot defition
robot_node = supervisor.getFromDef(ROBOT_NAME)
if robot_node is None:
print(f"ERROR: {ROBOT_NAME} not found.")
status = -1

def callback_mode(payload: bytearray) -> None:
"""Callback MODE Channel."""
driving_mode = payload[0]
# connect webots serial nodes to SerMuxProt
s_client = SerialWebots(supervisor_com_tx, supervisor_com_rx)
smp_server = Server(10, s_client)

if driving_mode:
print("Driving Mode Selected.")
else:
print("Train Mode Selected.")
sermux_channel_speed_set = smp_server.create_channel(
SPEED_SET_CHANNEL_NAME, SPEED_SET_DLC
)
sermux_channel_cmd_id = smp_server.create_channel(COMMAND_CHANNEL_NAME, CMD_DLC)

if sermux_channel_speed_set == 0:
print("ERROR: channel SPEED_SET not created.")
status = -1

def main_loop():
"""Main loop:
- Perform simulation steps until Webots is stopping the controller-
if sermux_channel_cmd_id == 0:
print("ERROR: channel CMD not created.")
status = -1

Returns:
number: Status
"""
status = 0
elapsed_time_since_reset = 0 # Elapsed time since reset [ms]
# create instance of robot logic class
controller = RobotController(robot_node, smp_server, timestep)

# Channel subscription.
smp_server.subscribe_to_channel(STATUS_CHANNEL_NAME, callback_status)
smp_server.subscribe_to_channel(LINE_SENSOR_CHANNEL_NAME, callback_line_sensors)
smp_server.subscribe_to_channel(MODE_CHANNEL_NAME, callback_mode)
smp_server.subscribe_to_channel(STATUS_CHANNEL_NAME, controller.callback_status)

if serialMuxProtChannelIdSPEED_SET == 0 or serialMuxProtChannelIdCMD == 0:
print("Channel SPEED_SET not created.")
else:
smp_server.subscribe_to_channel(
LINE_SENSOR_CHANNEL_NAME, controller.callback_line_sensors
)

if robot_node is None:
print(f"Robot DEF {ROBOT_NAME} not found.")
status = -1
smp_server.subscribe_to_channel(MODE_CHANNEL_NAME, controller.callback_mode)

else:
while supervisor.step(timestep) != -1:
elapsed_time_since_reset += timestep
smp_server.process(elapsed_time_since_reset)
# setup successful
if status != -1:
# simulation loop
while supervisor.step(timestep) != -1:
controller.process()

# stopping condition for sequence was reached
if controller.is_sequence_complete is True:
controller.reinitialize()
controller.is_sequence_complete = False

if reset_count[0] != 0:
reinitialize()
return status


Expand Down

0 comments on commit e136bf7

Please sign in to comment.