-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhancements to parking environment #464
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
|
||
from gymnasium import Env | ||
import numpy as np | ||
import random | ||
|
||
from highway_env.envs.common.abstract import AbstractEnv | ||
from highway_env.envs.common.observation import MultiAgentObservation, observation_factory | ||
|
@@ -11,6 +12,7 @@ | |
from highway_env.vehicle.graphics import VehicleGraphics | ||
from highway_env.vehicle.kinematics import Vehicle | ||
from highway_env.vehicle.objects import Landmark, Obstacle | ||
from highway_env.utils import are_polygons_intersecting | ||
|
||
|
||
class GoalEnv(Env): | ||
|
@@ -100,7 +102,15 @@ def default_config(cls) -> dict: | |
"scaling": 7, | ||
"controlled_vehicles": 1, | ||
"vehicles_count": 0, | ||
"add_walls": True | ||
"add_walls": True, | ||
"center_window": False, | ||
"n_rows": 2, # number of parking rows | ||
"y_offset": 10, # y distance between parallel rows | ||
"font_size": 22, # display font size for ids ['None' to stop render] | ||
"spots": 14, # spots in each row | ||
"obstacles": False, | ||
"random_start": False, | ||
"prevent_early_collision": False | ||
}) | ||
return config | ||
|
||
|
@@ -131,60 +141,152 @@ def _create_road(self, spots: int = 14) -> None: | |
|
||
:param spots: number of spots in the parking | ||
""" | ||
spots = self.config["spots"] | ||
net = RoadNetwork() | ||
width = 4.0 | ||
lt = (LineType.CONTINUOUS, LineType.CONTINUOUS) | ||
x_offset = 0 | ||
y_offset = 10 | ||
y_offset = self.config["y_offset"] | ||
length = 8 | ||
for k in range(spots): | ||
x = (k + 1 - spots // 2) * (width + x_offset) - width / 2 | ||
net.add_lane("a", "b", StraightLane([x, y_offset], [x, y_offset+length], width=width, line_types=lt)) | ||
net.add_lane("b", "c", StraightLane([x, -y_offset], [x, -y_offset-length], width=width, line_types=lt)) | ||
|
||
row_ys = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this could be simplified, why is this list needed? is the y coordinate not just a linear function of the row index? maybe make this a lambda function. |
||
for i in range(self.config["n_rows"]): | ||
if i == 0: | ||
row_ys.append([y_offset, y_offset+length]) | ||
elif i == 1: | ||
row_ys.append([-y_offset, -y_offset-length]) | ||
else: | ||
if i%2 == 0: | ||
row_ys.append([row_ys[i-2][1]+2*y_offset, row_ys[i-2][1]+2*y_offset+length]) | ||
else: | ||
row_ys.append([row_ys[i-2][1]-2*y_offset, row_ys[i-2][1]-2*y_offset-length]) | ||
|
||
id = 0 | ||
for row in range(0, self.config["n_rows"]): | ||
for k in range(spots): | ||
x = (k + 1 - spots // 2) * (width + x_offset) - width / 2 | ||
net.add_lane( | ||
chr(ord('a')+row), chr(ord('a')+row+1), | ||
StraightLane([x, row_ys[row][0]], [x, row_ys[row][1]], | ||
width=width, line_types=lt, identifier=id, | ||
display_font_size=self.config['font_size'] | ||
) | ||
) | ||
id += 1 | ||
|
||
self.road = Road(network=net, | ||
np_random=self.np_random, | ||
record_history=self.config["show_trajectories"]) | ||
|
||
# Store the allowed x,y coordinate ranges to spawn the agent | ||
allowed_y_space = [[-y_offset+8, y_offset-8]] | ||
if len(row_ys) > 2: | ||
for i in range(2, len(row_ys)): | ||
if i%2 == 0: | ||
allowed_y_space.append([row_ys[i-2][1]+8, row_ys[i][0]-8]) | ||
else: | ||
allowed_y_space.append([row_ys[i][0]+8, row_ys[i-2][1]-8]) | ||
|
||
self.allowed_vehicle_space = { | ||
'x' : (-30, 30), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure why this is hardcoded, it doesn't seem to adapt to other configs. Also, this is not used here, please move to create_vehicles. |
||
'y' : allowed_y_space | ||
} | ||
|
||
# Walls | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure why the code for walls is that much longer, it used to be 2 loops for horizontal and vertical walls. Can you not just change the endpoints positions to adapt to number of rows? |
||
x_end = abs((1 - spots // 2) * (width + x_offset) - width / 2) | ||
|
||
if len(row_ys) > 1: | ||
wall_y_top = row_ys[-1][1] + 4 if row_ys[-1][1] > 0 else row_ys[-1][1] - 4 | ||
wall_y_bottom = row_ys[-2][1] + 4 if row_ys[-2][1] > 0 else row_ys[-2][1] - 4 | ||
else: | ||
wall_y_bottom = row_ys[-1][1] + 4 | ||
wall_y_top = -y_offset - 4 | ||
|
||
wall_x = x_end + 14 | ||
|
||
for y in [wall_y_top, wall_y_bottom]: | ||
obstacle = Obstacle(self.road, [0, y]) | ||
obstacle.LENGTH, obstacle.WIDTH = (2*wall_x, 1) | ||
obstacle.diagonal = np.sqrt(obstacle.LENGTH**2 + obstacle.WIDTH**2) | ||
self.road.objects.append(obstacle) | ||
|
||
wall_y = 0 | ||
if self.config["n_rows"] > 1 and self.config["n_rows"]%2==1: wall_y = y_offset+4 | ||
elif self.config["n_rows"] == 1: wall_y = y_offset-6 | ||
|
||
for x in [-wall_x, wall_x]: | ||
obstacle = Obstacle( | ||
self.road, | ||
[x, wall_y], | ||
heading=np.pi / 2 | ||
) | ||
obstacle.LENGTH, obstacle.WIDTH = (abs(wall_y_top) + abs(wall_y_bottom), 1) | ||
obstacle.diagonal = np.sqrt(obstacle.LENGTH**2 + obstacle.WIDTH**2) | ||
self.road.objects.append(obstacle) | ||
|
||
if self.config['obstacles'] and self.config["n_rows"] > 3: | ||
self._create_obstacles() | ||
|
||
def _create_obstacles(self): | ||
"""Create some random obstacles""" | ||
obstacle = Obstacle(self.road, (18, -18 - self.config['y_offset']), 90) | ||
obstacle.LENGTH, obstacle.WIDTH = 5, 5 | ||
obstacle.diagonal = np.sqrt(obstacle.LENGTH**2 + obstacle.WIDTH**2) | ||
obstacle.line_color = (187, 84, 49) | ||
self.road.objects.append(obstacle) | ||
|
||
obstacle = Obstacle(self.road, (-18, 18 + self.config['y_offset']), 90) | ||
obstacle.LENGTH, obstacle.WIDTH = 5, 5 | ||
obstacle.diagonal = np.sqrt(obstacle.LENGTH**2 + obstacle.WIDTH**2) | ||
obstacle.line_color = (187, 84, 49) | ||
self.road.objects.append(obstacle) | ||
|
||
def _create_vehicles(self) -> None: | ||
"""Create some new random vehicles of a given type, and add them on the road.""" | ||
# Controlled vehicles | ||
self.controlled_vehicles = [] | ||
for i in range(self.config["controlled_vehicles"]): | ||
vehicle = self.action_type.vehicle_class(self.road, [i*20, 0], 2*np.pi*self.np_random.uniform(), 0) | ||
if self.config["random_start"]: | ||
while(True): | ||
x = np.random.randint(self.allowed_vehicle_space['x'][0], self.allowed_vehicle_space['x'][1]) | ||
y_sector = np.random.choice(range(len(self.allowed_vehicle_space['y']))) | ||
y = np.random.randint(self.allowed_vehicle_space['y'][y_sector][0], self.allowed_vehicle_space['y'][y_sector][1]) | ||
vehicle = self.action_type.vehicle_class(self.road, [x, y], 2*np.pi*self.np_random.uniform(), 0) | ||
|
||
intersect = False | ||
for o in self.road.objects: | ||
res, _, _ = are_polygons_intersecting(vehicle.polygon(), o.polygon(), vehicle.velocity, o.velocity) | ||
intersect |= res | ||
if not intersect: | ||
break | ||
else: | ||
vehicle = self.action_type.vehicle_class(self.road, [0, 0], 2*np.pi*self.np_random.uniform(), 0) | ||
|
||
vehicle.color = VehicleGraphics.EGO_COLOR | ||
self.road.vehicles.append(vehicle) | ||
self.controlled_vehicles.append(vehicle) | ||
|
||
# Goal | ||
lane = self.np_random.choice(self.road.network.lanes_list()) | ||
self.goal = Landmark(self.road, lane.position(lane.length/2, 0), heading=lane.heading) | ||
goal_lane = self.np_random.choice(self.road.network.lanes_list()) | ||
self.goal = Landmark(self.road, goal_lane.position(goal_lane.length/2, 0), heading=goal_lane.heading) | ||
self.road.objects.append(self.goal) | ||
|
||
# Other vehicles | ||
for i in range(self.config["vehicles_count"]): | ||
lane = ("a", "b", i) if self.np_random.uniform() >= 0.5 else ("b", "c", i) | ||
free_lanes = self.road.network.lanes_list().copy() | ||
free_lanes.remove(goal_lane) | ||
random.Random(4).shuffle(free_lanes) | ||
for _ in range(self.config["vehicles_count"] - 1): | ||
lane = free_lanes.pop() | ||
v = Vehicle.make_on_lane(self.road, lane, 4, speed=0) | ||
self.road.vehicles.append(v) | ||
for v in self.road.vehicles: # Prevent early collisions | ||
if v is not self.vehicle and ( | ||
np.linalg.norm(v.position - self.goal.position) < 20 or | ||
np.linalg.norm(v.position - self.vehicle.position) < 20): | ||
self.road.vehicles.remove(v) | ||
|
||
# Walls | ||
if self.config['add_walls']: | ||
width, height = 70, 42 | ||
for y in [-height / 2, height / 2]: | ||
obstacle = Obstacle(self.road, [0, y]) | ||
obstacle.LENGTH, obstacle.WIDTH = (width, 1) | ||
obstacle.diagonal = np.sqrt(obstacle.LENGTH**2 + obstacle.WIDTH**2) | ||
self.road.objects.append(obstacle) | ||
for x in [-width / 2, width / 2]: | ||
obstacle = Obstacle(self.road, [x, 0], heading=np.pi / 2) | ||
obstacle.LENGTH, obstacle.WIDTH = (height, 1) | ||
obstacle.diagonal = np.sqrt(obstacle.LENGTH**2 + obstacle.WIDTH**2) | ||
self.road.objects.append(obstacle) | ||
if self.config["prevent_early_collision"]: | ||
for v in self.road.vehicles: # Prevent early collisions | ||
if v is not self.vehicle and ( | ||
np.linalg.norm(v.position - self.goal.position) < 20 or | ||
np.linalg.norm(v.position - self.vehicle.position) < 20): | ||
self.road.vehicles.remove(v) | ||
|
||
|
||
def compute_reward(self, achieved_goal: np.ndarray, desired_goal: np.ndarray, info: dict, p: float = 0.5) -> float: | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,14 +125,23 @@ def display(cls, lane: AbstractLane, surface: WorldSurface) -> None: | |
stripes_count = int(2 * (surface.get_height() + surface.get_width()) / (cls.STRIPE_SPACING * surface.scaling)) | ||
s_origin, _ = lane.local_coordinates(surface.origin) | ||
s0 = (int(s_origin) // cls.STRIPE_SPACING - stripes_count // 2) * cls.STRIPE_SPACING | ||
|
||
last_pixel = [] # Store the pixel coordinates of the last pixel of a lane | ||
for side in range(2): | ||
if lane.line_types[side] == LineType.STRIPED: | ||
cls.striped_line(lane, surface, stripes_count, s0, side) | ||
elif lane.line_types[side] == LineType.CONTINUOUS: | ||
cls.continuous_curve(lane, surface, stripes_count, s0, side) | ||
last_pixel.append(cls.continuous_curve(lane, surface, stripes_count, s0, side, return_last_pixel=True)) | ||
elif lane.line_types[side] == LineType.CONTINUOUS_LINE: | ||
cls.continuous_line(lane, surface, stripes_count, s0, side) | ||
|
||
if lane.display_font_size != None: | ||
x = (last_pixel[0][0] + last_pixel[1][0]) // 2 - 8 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that passing around this additional |
||
y = last_pixel[0][1] | ||
font1 = pygame.font.SysFont('arial', lane.display_font_size) | ||
text_surface = font1.render(str(lane.identifier), True, (0, 0, 0)) | ||
surface.blit(text_surface, (x,y)) | ||
|
||
@classmethod | ||
def striped_line(cls, lane: AbstractLane, surface: WorldSurface, stripes_count: int, longitudinal: float, | ||
side: int) -> None: | ||
|
@@ -152,7 +161,7 @@ def striped_line(cls, lane: AbstractLane, surface: WorldSurface, stripes_count: | |
|
||
@classmethod | ||
def continuous_curve(cls, lane: AbstractLane, surface: WorldSurface, stripes_count: int, | ||
longitudinal: float, side: int) -> None: | ||
longitudinal: float, side: int, return_last_pixel: bool = False) -> None: | ||
""" | ||
Draw a striped line on one side of a lane, on a surface. | ||
|
||
|
@@ -161,11 +170,12 @@ def continuous_curve(cls, lane: AbstractLane, surface: WorldSurface, stripes_cou | |
:param stripes_count: the number of stripes to draw | ||
:param longitudinal: the longitudinal position of the first stripe [m] | ||
:param side: which side of the road to draw [0:left, 1:right] | ||
:param return_last_pixel: returns the last pixel coordinate of line | ||
""" | ||
starts = longitudinal + np.arange(stripes_count) * cls.STRIPE_SPACING | ||
ends = longitudinal + np.arange(stripes_count) * cls.STRIPE_SPACING + cls.STRIPE_SPACING | ||
lats = [(side - 0.5) * lane.width_at(s) for s in starts] | ||
cls.draw_stripes(lane, surface, starts, ends, lats) | ||
return cls.draw_stripes(lane, surface, starts, ends, lats, return_last_pixel) | ||
|
||
@classmethod | ||
def continuous_line(cls, lane: AbstractLane, surface: WorldSurface, stripes_count: int, longitudinal: float, | ||
|
@@ -186,7 +196,8 @@ def continuous_line(cls, lane: AbstractLane, surface: WorldSurface, stripes_coun | |
|
||
@classmethod | ||
def draw_stripes(cls, lane: AbstractLane, surface: WorldSurface, | ||
starts: List[float], ends: List[float], lats: List[float]) -> None: | ||
starts: List[float], ends: List[float], lats: List[float], | ||
return_last_pixel: bool = False) -> None: | ||
""" | ||
Draw a set of stripes along a lane. | ||
|
||
|
@@ -195,6 +206,7 @@ def draw_stripes(cls, lane: AbstractLane, surface: WorldSurface, | |
:param starts: a list of starting longitudinal positions for each stripe [m] | ||
:param ends: a list of ending longitudinal positions for each stripe [m] | ||
:param lats: a list of lateral positions for each stripe [m] | ||
:param return_last_pixel: returns the last pixel coordinate of line | ||
""" | ||
starts = np.clip(starts, 0, lane.length) | ||
ends = np.clip(ends, 0, lane.length) | ||
|
@@ -204,6 +216,10 @@ def draw_stripes(cls, lane: AbstractLane, surface: WorldSurface, | |
(surface.vec2pix(lane.position(starts[k], lats[k]))), | ||
(surface.vec2pix(lane.position(ends[k], lats[k]))), | ||
max(surface.pix(cls.STRIPE_WIDTH), 1)) | ||
last_coordinate = (surface.vec2pix(lane.position(ends[k], lats[k]))) | ||
|
||
if return_last_pixel: | ||
return last_coordinate | ||
|
||
@classmethod | ||
def draw_ground(cls, lane: AbstractLane, surface: WorldSurface, color: Tuple[float], width: float, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you should edit these common files in abstract and graphics, instead just subclass the window_position() method in the environment to customise it.
Also, I don't understand why you use [0.5, 0.5] as a centre position, this is in meters so it looks like a strange choice to me.