Skip to content

Commit

Permalink
seminar06-planning
Browse files Browse the repository at this point in the history
  • Loading branch information
Lana243 committed Apr 18, 2024
1 parent a81c0b3 commit 06d3f00
Show file tree
Hide file tree
Showing 82 changed files with 77,937 additions and 0 deletions.
388 changes: 388 additions & 0 deletions seminar06-planning/planner.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,388 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Install python dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install jsonpickle dacite shapely"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Run the planning server (should be executed only once)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import py_planning\n",
"py_planning.init()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Visualization"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# # you can also open http://127.0.0.1:8008 in your browser\n",
"\n",
"from IPython.display import IFrame\n",
"IFrame('http://127.0.0.1:8008', width=\"100%\", height=650)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Lane centering"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from enum import IntEnum\n",
"import math\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"from py_planning.data_types import PlannedPath, PlannedState, State, Position # data types used by planner interface\n",
"from shapely.geometry import LineString, Point\n",
"\n",
"import time\n",
"\n",
"\"\"\"\n",
"find closest point on a polyline to the given point\n",
"\"\"\"\n",
"def get_index_of_closest_point(line: LineString, point: Point):\n",
" closest_point_index = None\n",
" min_distance = float('inf')\n",
"\n",
" for i, line_point in enumerate(line.coords):\n",
" line_point = Point(line_point)\n",
" distance = point.distance(line_point)\n",
" if distance < min_distance:\n",
" min_distance = distance\n",
" closest_point_index = i\n",
"\n",
" return closest_point_index\n",
"\n",
"\n",
"\"\"\"\n",
"This function is called by the simulator for each tick.\n",
"It should return recent planned trajectory up to date with the environment state.\n",
"'state' parameter contains current world observations and vehicle state.\n",
"\"\"\"\n",
"def do_plan(state: State) -> PlannedPath:\n",
" vehicle_pose = state.vehicle_pose\n",
" vehicle_pos = Point(vehicle_pose.pos.x, vehicle_pose.pos.y) # current position of the AV\n",
"\n",
" centerline = LineString([(p.x, p.y) for p in state.lane_path.centerline])\n",
"\n",
" closest_index = get_index_of_closest_point(centerline, vehicle_pos)\n",
" current_velocity = vehicle_pose.velocity\n",
"\n",
" # we leave some previous poses to make AV control stable\n",
" prev_poses_count = 3\n",
" max_poses_count = 50\n",
" first_pose_index = max(closest_index - prev_poses_count, 0)\n",
"\n",
" # as a baseline here we just follow the centerline\n",
" planned_states = [\n",
" PlannedState(pos=p, velocity=current_velocity) for p in state.lane_path.centerline\n",
" ][first_pose_index:first_pose_index+ max_poses_count]\n",
"\n",
" return PlannedPath(states=planned_states)\n",
" \n",
"\n",
"# run the case in the simulator, watch the visualization\n",
"py_planning.run_planner(\n",
" do_plan,\n",
" stop_on_fail=True # set to False to continue planning after case fail (useful for debugging)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Graph geometry planning"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_rotation_matrix(yaw):\n",
" T = np.zeros((len(yaw), 2, 2))\n",
" T[:, 0, 0] = np.cos(yaw)\n",
" T[:, 0, 1] = -np.sin(yaw)\n",
" T[:, 1, 0] = np.sin(yaw)\n",
" T[:, 1, 1] = np.cos(yaw)\n",
"\n",
" return T\n",
" \n",
"class Layer():\n",
" class Id(IntEnum):\n",
" X = 0\n",
" Y = 1\n",
" YAW = 2\n",
" COST = 3\n",
" PARENT = 4\n",
" SIZE = 5\n",
"\n",
" def __init__(self, N=None, nodes=None):\n",
" assert (N is None) ^ (nodes is None)\n",
" if N is not None:\n",
" self.nodes = np.zeros((N, Layer.Id.SIZE))\n",
" if nodes is not None:\n",
" assert nodes.shape[1] == Layer.Id.SIZE\n",
" self.nodes = nodes\n",
" \n",
" @property\n",
" def x(self):\n",
" return self.nodes[:, Layer.Id.X]\n",
" \n",
" @property\n",
" def y(self):\n",
" return self.nodes[:, Layer.Id.Y]\n",
" \n",
" @property\n",
" def yaw(self):\n",
" return self.nodes[:, Layer.Id.YAW]\n",
" \n",
" @property\n",
" def cost(self):\n",
" return self.nodes[:, Layer.Id.COST]\n",
" \n",
" @property\n",
" def parent(self):\n",
" return self.nodes[:, Layer.Id.PARENT]\n",
" \n",
" @property\n",
" def N(self):\n",
" return self.nodes.shape[0]\n",
" \n",
" @property\n",
" def M(self):\n",
" return self.nodes.shape[1]\n",
" \n",
" \n",
"def arc_primitive(c, ds):\n",
" if c == 0:\n",
" return 0, ds, 0\n",
" else:\n",
" dyaw = c * ds\n",
" return dyaw, 1 / c * math.sin(dyaw), 1 / c * (1 - math.cos(dyaw))\n",
"\n",
"\n",
"class Graph(list):\n",
" def nodes_num(self):\n",
" nodes = 0\n",
" for layer in self:\n",
" nodes += layer.N\n",
" return nodes\n",
"\n",
"\n",
"def search(initial_state, lane_path, obstacles, curvature_primitives=[-0.2, 0., 0.2], ds=1, tree_depth=6, sparse=True):\n",
" graph = Graph()\n",
" initial_layer = Layer(1)\n",
" initial_layer.nodes[:, Layer.Id.X] = initial_state.vehicle_pose.pos.x\n",
" initial_layer.nodes[:, Layer.Id.Y] = initial_state.vehicle_pose.pos.y\n",
" initial_layer.nodes[:, Layer.Id.YAW] = initial_state.vehicle_pose.rot\n",
" graph.append(initial_layer) \n",
" \n",
" for i in range(tree_depth):\n",
" X_c = graph[-1]\n",
" X_n = _make_step(X_c, ds, curvature_primitives, lane_path, obstacles)\n",
" if sparse:\n",
" X_n = _sparsify(X_n)\n",
"\n",
" graph.append(X_n)\n",
"\n",
" return graph, _restore_path(graph, np.argmin(graph[-1].nodes[:, Layer.Id.COST]))\n",
"\n",
"\n",
"def _make_step(X_c, ds, curvature_primitives, lane_path, obstacles):\n",
" N = X_c.N\n",
" X_n = Layer(N * len(curvature_primitives))\n",
"\n",
" for i, c in enumerate(curvature_primitives):\n",
" # assumme instant change of curvature and movement along circle\n",
" dyaw, dx, dy = arc_primitive(c, ds)\n",
" shift = np.array([dx, dy])\n",
"\n",
" yaw_c = X_c.yaw\n",
" T = create_rotation_matrix(yaw_c)\n",
"\n",
" X_n.x[i * N : (i + 1) * N] = X_c.x + T[:, 0] @ shift\n",
" X_n.y[i * N : (i + 1) * N] = X_c.y + T[:, 1] @ shift\n",
" X_n.yaw[i * N : (i + 1) * N] = yaw_c + dyaw\n",
" X_n.parent[i * N : (i + 1) * N] = np.arange(N)\n",
" X_n.cost[i * N : (i + 1) * N] = X_c.cost + c ** 2 \n",
" # _update_cost(X_n.nodes[i * N : (i + 1) * N, :], lane_path, obstacles)\n",
"\n",
" return X_n\n",
"\n",
"\n",
"# def _update_cost(X_n, lane_path, obstacles):\n",
"# centerline = LineString([(p.x, p.y) for p in lane_path])\n",
"# for i, node in enumerate(X_n):\n",
"# _, d = get_index_of_closest_point(centerline, Point(node[Layer.Id.X], node[Layer.Id.Y]))\n",
"# X_n[i, Layer.Id.COST] += d\n",
"# # obstacles = get_closest_static_obstacles(obstacles, node[Layer.Id.X], node[Layer.Id.Y], 1)\n",
"# # if len(obstacles) > 0:\n",
"# # d_to_closest_static = dist(node[Layer.Id.X], node[Layer.Id.Y], obstacles[0])\n",
"# # if d_to_closest_static < 2 * max(obstacles[0].w, obstacles[0].h):\n",
"# # X_n[i, Layer.Id.COST] = np.inf\n",
"# # else:\n",
"# # X_n[i, Layer.Id.COST] += 10 * np.exp(-d_to_closest_static + 2 * max(obstacles[0].w, obstacles[0].h))\n",
" \n",
"\n",
"\n",
"def _sparsify(layer, min_nodes=5, step_x=1, step_y=1,step_yaw=0.1):\n",
" if layer.N < min_nodes:\n",
" return layer\n",
"\n",
" def node_to_key(x, y, yaw):\n",
" return (round(x / step_x), round(y / step_y), round(yaw / step_yaw))\n",
" d = {}\n",
" for i in range(layer.N):\n",
" key = node_to_key(layer.x[i], layer.y[i], layer.yaw[i])\n",
" if key in d:\n",
" d[key] = min(d[key], (layer.cost[i], i))\n",
" else:\n",
" d[key] = (layer.cost[i], i)\n",
" indx = list(map(lambda value: value[1][1], d.items()))\n",
" layer.nodes = layer.nodes[indx]\n",
"\n",
" return layer\n",
"\n",
"\n",
"def _restore_path(graph, i):\n",
" path = Graph()\n",
" for j in range(len(graph)):\n",
" layer = graph[-j - 1]\n",
" path.append(Layer(nodes=np.copy(layer.nodes[i:i+1])))\n",
" i = int(layer.parent[i])\n",
"\n",
" # fix parent linkage\n",
" path[-1].parent[:] = 0\n",
"\n",
" path.reverse()\n",
" return path"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from py_planning.data_types import PlannedPath, PlannedState, State, Position # data types used by planner interface\n",
"from shapely.geometry import LineString, Point\n",
"import matplotlib.pyplot as plt\n",
"\n",
"\"\"\"\n",
"find closest point on a polyline to the given point\n",
"\"\"\"\n",
"def get_index_of_closest_point(line: LineString, point: Point):\n",
" closest_point_index = None\n",
" min_distance = float('inf')\n",
"\n",
" for i, line_point in enumerate(line.coords):\n",
" line_point = Point(line_point)\n",
" distance = point.distance(line_point)\n",
" if distance < min_distance:\n",
" min_distance = distance\n",
" closest_point_index = i\n",
"\n",
" return closest_point_index, min_distance\n",
"\n",
"\n",
"def dist(x, y, static_obstacle):\n",
" return (x - static_obstacle.p[0]) ** 2 + (y - static_obstacle.p[1]) ** 2\n",
"\n",
"\n",
"def get_closest_static_obstacles(static_obstacles,x, y, k):\n",
" obstacles = sorted(static_obstacles, key=lambda obstacle: dist(x, y, obstacle))\n",
" return obstacles[:min(len(obstacles), k)]\n",
"\n",
"\n",
"def do_graph_planning(state: State) -> PlannedPath:\n",
" vehicle_pose = state.vehicle_pose\n",
" vehicle_pos = Point(vehicle_pose.pos.x, vehicle_pose.pos.y) # current position of the AV\n",
" centerline = LineString([(p.x, p.y) for p in state.lane_path.centerline])\n",
" closest_index, _ = get_index_of_closest_point(centerline, vehicle_pos)\n",
" lane_path = state.lane_path.centerline[max(0, closest_index - 20) : min(len(state.lane_path.centerline), closest_index + 20) : 2]\n",
" obstacles = get_closest_static_obstacles(state.static_obstacles, state.vehicle_pose.pos.x, state.vehicle_pose.pos.y, 1)\n",
"\n",
" ds = 1\n",
" graph, path = search(state, lane_path, obstacles, tree_depth=12, ds=ds)\n",
" planned_path = list(map(lambda layer: PlannedState(pos=Position(float(layer.nodes[0, Layer.Id.X]), float(layer.nodes[0, Layer.Id.Y])), velocity=state.vehicle_pose.velocity, rot=float(layer.nodes[0, Layer.Id.YAW])), path))\n",
" return PlannedPath(states=planned_path) \n",
"\n",
"\n",
"py_planning.run_planner(\n",
" do_graph_planning,\n",
" stop_on_fail=True # set to False to continue planning after case fail (useful for debugging)\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
2 changes: 2 additions & 0 deletions seminar06-planning/py_planning/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .py_planning import init, run_planner
from . import data_types
Loading

0 comments on commit 06d3f00

Please sign in to comment.