From 73d31d50c41b312869d409fd2329dcacf8333d3f Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Wed, 4 Sep 2024 17:45:09 -0700 Subject: [PATCH] controlsd: pull out LDW (#33479) * controlsd: pull out LDW * cleanup * good ol mypy --- selfdrive/controls/controlsd.py | 50 +++++++++++---------------------- selfdrive/controls/lib/ldw.py | 41 +++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 34 deletions(-) create mode 100644 selfdrive/controls/lib/ldw.py diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index a56c335ed40429..d9e258f07f07e0 100755 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -22,6 +22,7 @@ from opendbc.car.car_helpers import get_car_interface from openpilot.selfdrive.controls.lib.alertmanager import AlertManager, set_offroad_alert from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature, get_startup_event +from openpilot.selfdrive.controls.lib.ldw import LaneDepartureWarning from openpilot.selfdrive.controls.lib.events import Events, ET from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID @@ -34,10 +35,6 @@ from openpilot.system.hardware import HARDWARE -LDW_MIN_SPEED = 31 * CV.MPH_TO_MS -LANE_DEPARTURE_THRESHOLD = 0.1 -CAMERA_OFFSET = 0.04 - JOYSTICK_MAX_LAT_ACCEL = 2.5 # m/s^2 REPLAY = "REPLAY" in os.environ @@ -48,7 +45,6 @@ ThermalStatus = log.DeviceState.ThermalStatus State = log.SelfdriveState.OpenpilotState PandaType = log.PandaState.PandaType -Desire = log.Desire LaneChangeState = log.LaneChangeState LaneChangeDirection = log.LaneChangeDirection EventName = car.OnroadEvent.EventName @@ -130,6 +126,8 @@ def __init__(self, CI=None): self.LoC = LongControl(self.CP) self.VM = VehicleModel(self.CP) + self.ldw = LaneDepartureWarning() + self.LaC: LatControl if self.CP.steerControlType == car.CarParams.SteerControlType.angle: self.LaC = LatControlAngle(self.CP, self.CI) @@ -143,7 +141,6 @@ def __init__(self, CI=None): self.active = False self.mismatch_counter = 0 self.cruise_mismatch_counter = 0 - self.last_blinker_frame = 0 self.last_steering_pressed_frame = 0 self.distance_traveled = 0 self.last_functional_fan_frame = 0 @@ -382,6 +379,13 @@ def update_events(self, CS): if self.sm['modelV2'].frameDropPerc > 20: self.events.add(EventName.modeldLagging) + # decrement personality on distance button press + if self.CP.openpilotLongitudinalControl: + if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents): + self.personality = (self.personality - 1) % 3 + self.params.put_nonblocking('LongitudinalPersonality', str(self.personality)) + self.events.add(EventName.personalityChanged) + def data_sample(self): """Receive data from sockets""" @@ -557,13 +561,6 @@ def state_control(self, CS): cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}") setattr(actuators, p, 0.0) - # decrement personality on distance button press - if self.CP.openpilotLongitudinalControl: - if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents): - self.personality = (self.personality - 1) % 3 - self.params.put_nonblocking('LongitudinalPersonality', str(self.personality)) - self.events.add(EventName.personalityChanged) - return CC, lac_log def publish_logs(self, CS, CC, lac_log): @@ -594,27 +591,12 @@ def publish_logs(self, CS, CC, lac_log): hudControl.rightLaneVisible = True hudControl.leftLaneVisible = True - recent_blinker = (self.sm.frame - self.last_blinker_frame) * DT_CTRL < 5.0 # 5s blinker cooldown - ldw_allowed = self.is_ldw_enabled and CS.vEgo > LDW_MIN_SPEED and not recent_blinker \ - and not CC.latActive and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibrated - - model_v2 = self.sm['modelV2'] - desire_prediction = model_v2.meta.desirePrediction - if len(desire_prediction) and ldw_allowed: - right_lane_visible = model_v2.laneLineProbs[2] > 0.5 - left_lane_visible = model_v2.laneLineProbs[1] > 0.5 - l_lane_change_prob = desire_prediction[Desire.laneChangeLeft] - r_lane_change_prob = desire_prediction[Desire.laneChangeRight] - - lane_lines = model_v2.laneLines - l_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET)) - r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET)) - - hudControl.leftLaneDepart = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close) - hudControl.rightLaneDepart = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close) - - if hudControl.rightLaneDepart or hudControl.leftLaneDepart: - self.events.add(EventName.ldw) + self.ldw.update(self.sm.frame, self.sm['modelV2'], CS, CC) + if self.is_ldw_enabled and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibrated: + hudControl.leftLaneDepart = self.ldw.left + hudControl.rightLaneDepart = self.ldw.right + if self.ldw.warning: + self.events.add(EventName.ldw) clear_event_types = set() if ET.WARNING not in self.state_machine.current_alert_types: diff --git a/selfdrive/controls/lib/ldw.py b/selfdrive/controls/lib/ldw.py new file mode 100644 index 00000000000000..caf03fec7327fc --- /dev/null +++ b/selfdrive/controls/lib/ldw.py @@ -0,0 +1,41 @@ +from cereal import log +from openpilot.common.realtime import DT_CTRL +from openpilot.common.conversions import Conversions as CV + + +CAMERA_OFFSET = 0.04 +LDW_MIN_SPEED = 31 * CV.MPH_TO_MS +LANE_DEPARTURE_THRESHOLD = 0.1 + +class LaneDepartureWarning: + def __init__(self): + self.left = False + self.right = False + self.last_blinker_frame = 0 + + def update(self, frame, modelV2, CS, CC): + if CS.leftBlinker or CS.rightBlinker: + self.last_blinker_frame = frame + + recent_blinker = (frame - self.last_blinker_frame) * DT_CTRL < 5.0 # 5s blinker cooldown + ldw_allowed = CS.vEgo > LDW_MIN_SPEED and not recent_blinker and not CC.latActive + + desire_prediction = modelV2.meta.desirePrediction + if len(desire_prediction) and ldw_allowed: + right_lane_visible = modelV2.laneLineProbs[2] > 0.5 + left_lane_visible = modelV2.laneLineProbs[1] > 0.5 + l_lane_change_prob = desire_prediction[log.Desire.laneChangeLeft] + r_lane_change_prob = desire_prediction[log.Desire.laneChangeRight] + + lane_lines = modelV2.laneLines + l_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET)) + r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET)) + + self.left = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close) + self.right = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close) + else: + self.left, self.right = False, False + + @property + def warning(self) -> bool: + return bool(self.left or self.right)