openpilot v0.9.6 release

date: 2024-01-12T10:13:37
master commit: ba792d576a49a0899b88a753fa1c52956bedf9e6
This commit is contained in:
FrogAi
2024-01-12 22:39:28 -07:00
commit 08e9fb1edc
1881 changed files with 653708 additions and 0 deletions

View File

View File

@@ -0,0 +1,62 @@
import copy
import os
import json
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict, Optional
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.events import Alert
with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) as f:
OFFROAD_ALERTS = json.load(f)
def set_offroad_alert(alert: str, show_alert: bool, extra_text: Optional[str] = None) -> None:
if show_alert:
a = copy.copy(OFFROAD_ALERTS[alert])
a['extra'] = extra_text or ''
Params().put(alert, json.dumps(a))
else:
Params().remove(alert)
@dataclass
class AlertEntry:
alert: Optional[Alert] = None
start_frame: int = -1
end_frame: int = -1
def active(self, frame: int) -> bool:
return frame <= self.end_frame
class AlertManager:
def __init__(self):
self.alerts: Dict[str, AlertEntry] = defaultdict(AlertEntry)
def add_many(self, frame: int, alerts: List[Alert]) -> None:
for alert in alerts:
entry = self.alerts[alert.alert_type]
entry.alert = alert
if not entry.active(frame):
entry.start_frame = frame
min_end_frame = entry.start_frame + alert.duration
entry.end_frame = max(frame + 1, min_end_frame)
def process_alerts(self, frame: int, clear_event_types: set) -> Optional[Alert]:
current_alert = AlertEntry()
for v in self.alerts.values():
if not v.alert:
continue
if v.alert.event_type in clear_event_types:
v.end_frame = -1
# sort by priority first and then by start_frame
greater = current_alert.alert is None or (v.alert.priority, v.start_frame) > (current_alert.alert.priority, current_alert.start_frame)
if v.active(frame) and greater:
current_alert = v
return current_alert.alert

View File

@@ -0,0 +1,56 @@
{
"Offroad_TemperatureTooHigh": {
"text": "Device temperature too high. System cooling down before starting. Current internal component temperature: %1",
"severity": 1
},
"Offroad_ConnectivityNeededPrompt": {
"text": "Immediately connect to the internet to check for updates. If you do not connect to the internet, openpilot won't engage in %1",
"severity": 0,
"_comment": "Set extra field to number of days"
},
"Offroad_ConnectivityNeeded": {
"text": "Connect to internet to check for updates. openpilot won't automatically start until it connects to internet to check for updates.",
"severity": 1
},
"Offroad_UpdateFailed": {
"text": "Unable to download updates\n%1",
"severity": 1,
"_comment": "Set extra field to the failed reason."
},
"Offroad_InvalidTime": {
"text": "Invalid date and time settings, system won't start. Connect to internet to set time.",
"severity": 1
},
"Offroad_IsTakingSnapshot": {
"text": "Taking camera snapshots. System won't start until finished.",
"severity": 0
},
"Offroad_NeosUpdate": {
"text": "An update to your device's operating system is downloading in the background. You will be prompted to update when it's ready to install.",
"severity": 0
},
"Offroad_UnofficialHardware": {
"text": "Device failed to register. It will not connect to or upload to comma.ai servers, and receives no support from comma.ai. If this is an official device, visit https://comma.ai/support.",
"severity": 1
},
"Offroad_StorageMissing": {
"text": "NVMe drive not mounted.",
"severity": 1
},
"Offroad_BadNvme": {
"text": "Unsupported NVMe drive detected. Device may draw significantly more power and overheat due to the unsupported NVMe.",
"severity": 1
},
"Offroad_CarUnrecognized": {
"text": "openpilot was unable to identify your car. Your car is either unsupported or its ECUs are not recognized. Please submit a pull request to add the firmware versions to the proper vehicle. Need help? Join discord.comma.ai.",
"severity": 0
},
"Offroad_NoFirmware": {
"text": "openpilot was unable to identify your car. Check integrity of cables and ensure all connections are secure, particularly that the comma power is fully inserted in the OBD-II port of the vehicle. Need help? Join discord.comma.ai.",
"severity": 0
},
"Offroad_Recalibration": {
"text": "openpilot detected a change in the device's mounting position. Ensure the device is fully seated in the mount and the mount is firmly secured to the windshield.",
"severity": 0
}
}

View File

@@ -0,0 +1,114 @@
from cereal import log
from openpilot.common.conversions import Conversions as CV
from openpilot.common.realtime import DT_MDL
LaneChangeState = log.LateralPlan.LaneChangeState
LaneChangeDirection = log.LateralPlan.LaneChangeDirection
LANE_CHANGE_SPEED_MIN = 20 * CV.MPH_TO_MS
LANE_CHANGE_TIME_MAX = 10.
DESIRES = {
LaneChangeDirection.none: {
LaneChangeState.off: log.LateralPlan.Desire.none,
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.none,
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.none,
},
LaneChangeDirection.left: {
LaneChangeState.off: log.LateralPlan.Desire.none,
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeLeft,
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeLeft,
},
LaneChangeDirection.right: {
LaneChangeState.off: log.LateralPlan.Desire.none,
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeRight,
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeRight,
},
}
class DesireHelper:
def __init__(self):
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
self.lane_change_timer = 0.0
self.lane_change_ll_prob = 1.0
self.keep_pulse_timer = 0.0
self.prev_one_blinker = False
self.desire = log.LateralPlan.Desire.none
def update(self, carstate, lateral_active, lane_change_prob):
v_ego = carstate.vEgo
one_blinker = carstate.leftBlinker != carstate.rightBlinker
below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN
if not lateral_active or self.lane_change_timer > LANE_CHANGE_TIME_MAX:
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
else:
# LaneChangeState.off
if self.lane_change_state == LaneChangeState.off and one_blinker and not self.prev_one_blinker and not below_lane_change_speed:
self.lane_change_state = LaneChangeState.preLaneChange
self.lane_change_ll_prob = 1.0
# LaneChangeState.preLaneChange
elif self.lane_change_state == LaneChangeState.preLaneChange:
# Set lane change direction
self.lane_change_direction = LaneChangeDirection.left if \
carstate.leftBlinker else LaneChangeDirection.right
torque_applied = carstate.steeringPressed and \
((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
if not one_blinker or below_lane_change_speed:
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
elif torque_applied and not blindspot_detected:
self.lane_change_state = LaneChangeState.laneChangeStarting
# LaneChangeState.laneChangeStarting
elif self.lane_change_state == LaneChangeState.laneChangeStarting:
# fade out over .5s
self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0)
# 98% certainty
if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01:
self.lane_change_state = LaneChangeState.laneChangeFinishing
# LaneChangeState.laneChangeFinishing
elif self.lane_change_state == LaneChangeState.laneChangeFinishing:
# fade in laneline over 1s
self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0)
if self.lane_change_ll_prob > 0.99:
self.lane_change_direction = LaneChangeDirection.none
if one_blinker:
self.lane_change_state = LaneChangeState.preLaneChange
else:
self.lane_change_state = LaneChangeState.off
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange):
self.lane_change_timer = 0.0
else:
self.lane_change_timer += DT_MDL
self.prev_one_blinker = one_blinker
self.desire = DESIRES[self.lane_change_direction][self.lane_change_state]
# Send keep pulse once per second during LaneChangeStart.preLaneChange
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting):
self.keep_pulse_timer = 0.0
elif self.lane_change_state == LaneChangeState.preLaneChange:
self.keep_pulse_timer += DT_MDL
if self.keep_pulse_timer > 1.0:
self.keep_pulse_timer = 0.0
elif self.desire in (log.LateralPlan.Desire.keepLeft, log.LateralPlan.Desire.keepRight):
self.desire = log.LateralPlan.Desire.none

View File

@@ -0,0 +1,213 @@
import math
from cereal import car, log
from openpilot.common.conversions import Conversions as CV
from openpilot.common.numpy_fast import clip, interp
from openpilot.common.realtime import DT_MDL
from openpilot.selfdrive.modeld.constants import ModelConstants
# WARNING: this value was determined based on the model's training distribution,
# model predictions above this speed can be unpredictable
# V_CRUISE's are in kph
V_CRUISE_MIN = 8
V_CRUISE_MAX = 145
V_CRUISE_UNSET = 255
V_CRUISE_INITIAL = 40
V_CRUISE_INITIAL_EXPERIMENTAL_MODE = 105
IMPERIAL_INCREMENT = 1.6 # should be CV.MPH_TO_KPH, but this causes rounding errors
MIN_SPEED = 1.0
CONTROL_N = 17
CAR_ROTATION_RADIUS = 0.0
# EU guidelines
MAX_LATERAL_JERK = 5.0
MAX_VEL_ERR = 5.0
ButtonEvent = car.CarState.ButtonEvent
ButtonType = car.CarState.ButtonEvent.Type
CRUISE_LONG_PRESS = 50
CRUISE_NEAREST_FUNC = {
ButtonType.accelCruise: math.ceil,
ButtonType.decelCruise: math.floor,
}
CRUISE_INTERVAL_SIGN = {
ButtonType.accelCruise: +1,
ButtonType.decelCruise: -1,
}
class VCruiseHelper:
def __init__(self, CP):
self.CP = CP
self.v_cruise_kph = V_CRUISE_UNSET
self.v_cruise_cluster_kph = V_CRUISE_UNSET
self.v_cruise_kph_last = 0
self.button_timers = {ButtonType.decelCruise: 0, ButtonType.accelCruise: 0}
self.button_change_states = {btn: {"standstill": False, "enabled": False} for btn in self.button_timers}
@property
def v_cruise_initialized(self):
return self.v_cruise_kph != V_CRUISE_UNSET
def update_v_cruise(self, CS, enabled, is_metric):
self.v_cruise_kph_last = self.v_cruise_kph
if CS.cruiseState.available:
if not self.CP.pcmCruise:
# if stock cruise is completely disabled, then we can use our own set speed logic
self._update_v_cruise_non_pcm(CS, enabled, is_metric)
self.v_cruise_cluster_kph = self.v_cruise_kph
self.update_button_timers(CS, enabled)
else:
self.v_cruise_kph = CS.cruiseState.speed * CV.MS_TO_KPH
self.v_cruise_cluster_kph = CS.cruiseState.speedCluster * CV.MS_TO_KPH
else:
self.v_cruise_kph = V_CRUISE_UNSET
self.v_cruise_cluster_kph = V_CRUISE_UNSET
def _update_v_cruise_non_pcm(self, CS, enabled, is_metric):
# handle button presses. TODO: this should be in state_control, but a decelCruise press
# would have the effect of both enabling and changing speed is checked after the state transition
if not enabled:
return
long_press = False
button_type = None
v_cruise_delta = 1. if is_metric else IMPERIAL_INCREMENT
for b in CS.buttonEvents:
if b.type.raw in self.button_timers and not b.pressed:
if self.button_timers[b.type.raw] > CRUISE_LONG_PRESS:
return # end long press
button_type = b.type.raw
break
else:
for k in self.button_timers.keys():
if self.button_timers[k] and self.button_timers[k] % CRUISE_LONG_PRESS == 0:
button_type = k
long_press = True
break
if button_type is None:
return
# Don't adjust speed when pressing resume to exit standstill
cruise_standstill = self.button_change_states[button_type]["standstill"] or CS.cruiseState.standstill
if button_type == ButtonType.accelCruise and cruise_standstill:
return
# Don't adjust speed if we've enabled since the button was depressed (some ports enable on rising edge)
if not self.button_change_states[button_type]["enabled"]:
return
v_cruise_delta = v_cruise_delta * (5 if long_press else 1)
if long_press and self.v_cruise_kph % v_cruise_delta != 0: # partial interval
self.v_cruise_kph = CRUISE_NEAREST_FUNC[button_type](self.v_cruise_kph / v_cruise_delta) * v_cruise_delta
else:
self.v_cruise_kph += v_cruise_delta * CRUISE_INTERVAL_SIGN[button_type]
# If set is pressed while overriding, clip cruise speed to minimum of vEgo
if CS.gasPressed and button_type in (ButtonType.decelCruise, ButtonType.setCruise):
self.v_cruise_kph = max(self.v_cruise_kph, CS.vEgo * CV.MS_TO_KPH)
self.v_cruise_kph = clip(round(self.v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX)
def update_button_timers(self, CS, enabled):
# increment timer for buttons still pressed
for k in self.button_timers:
if self.button_timers[k] > 0:
self.button_timers[k] += 1
for b in CS.buttonEvents:
if b.type.raw in self.button_timers:
# Start/end timer and store current state on change of button pressed
self.button_timers[b.type.raw] = 1 if b.pressed else 0
self.button_change_states[b.type.raw] = {"standstill": CS.cruiseState.standstill, "enabled": enabled}
def initialize_v_cruise(self, CS, experimental_mode: bool) -> None:
# initializing is handled by the PCM
if self.CP.pcmCruise:
return
initial = V_CRUISE_INITIAL_EXPERIMENTAL_MODE if experimental_mode else V_CRUISE_INITIAL
# 250kph or above probably means we never had a set speed
if any(b.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for b in CS.buttonEvents) and self.v_cruise_kph_last < 250:
self.v_cruise_kph = self.v_cruise_kph_last
else:
self.v_cruise_kph = int(round(clip(CS.vEgo * CV.MS_TO_KPH, initial, V_CRUISE_MAX)))
self.v_cruise_cluster_kph = self.v_cruise_kph
def apply_deadzone(error, deadzone):
if error > deadzone:
error -= deadzone
elif error < - deadzone:
error += deadzone
else:
error = 0.
return error
def apply_center_deadzone(error, deadzone):
if (error > - deadzone) and (error < deadzone):
error = 0.
return error
def rate_limit(new_value, last_value, dw_step, up_step):
return clip(new_value, last_value + dw_step, last_value + up_step)
def get_lag_adjusted_curvature(CP, v_ego, psis, curvatures, curvature_rates):
if len(psis) != CONTROL_N:
psis = [0.0]*CONTROL_N
curvatures = [0.0]*CONTROL_N
curvature_rates = [0.0]*CONTROL_N
v_ego = max(MIN_SPEED, v_ego)
# TODO this needs more thought, use .2s extra for now to estimate other delays
delay = CP.steerActuatorDelay + .2
# MPC can plan to turn the wheel and turn back before t_delay. This means
# in high delay cases some corrections never even get commanded. So just use
# psi to calculate a simple linearization of desired curvature
current_curvature_desired = curvatures[0]
psi = interp(delay, ModelConstants.T_IDXS[:CONTROL_N], psis)
average_curvature_desired = psi / (v_ego * delay)
desired_curvature = 2 * average_curvature_desired - current_curvature_desired
# This is the "desired rate of the setpoint" not an actual desired rate
desired_curvature_rate = curvature_rates[0]
max_curvature_rate = MAX_LATERAL_JERK / (v_ego**2) # inexact calculation, check https://github.com/commaai/openpilot/pull/24755
safe_desired_curvature_rate = clip(desired_curvature_rate,
-max_curvature_rate,
max_curvature_rate)
safe_desired_curvature = clip(desired_curvature,
current_curvature_desired - max_curvature_rate * DT_MDL,
current_curvature_desired + max_curvature_rate * DT_MDL)
return safe_desired_curvature, safe_desired_curvature_rate
def get_friction(lateral_accel_error: float, lateral_accel_deadzone: float, friction_threshold: float,
torque_params: car.CarParams.LateralTorqueTuning, friction_compensation: bool) -> float:
friction_interp = interp(
apply_center_deadzone(lateral_accel_error, lateral_accel_deadzone),
[-friction_threshold, friction_threshold],
[-torque_params.friction, torque_params.friction]
)
friction = float(friction_interp) if friction_compensation else 0.0
return friction
def get_speed_error(modelV2: log.ModelDataV2, v_ego: float) -> float:
# ToDo: Try relative error, and absolute speed
if len(modelV2.temporalPose.trans):
vel_err = clip(modelV2.temporalPose.trans[0] - v_ego, -MAX_VEL_ERR, MAX_VEL_ERR)
return float(vel_err)
return 0.0

996
selfdrive/controls/lib/events.py Executable file
View File

@@ -0,0 +1,996 @@
#!/usr/bin/env python3
import math
import os
from enum import IntEnum
from typing import Dict, Union, Callable, List, Optional
from cereal import log, car
import cereal.messaging as messaging
from openpilot.common.conversions import Conversions as CV
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.locationd.calibrationd import MIN_SPEED_FILTER
from openpilot.system.version import get_short_branch
AlertSize = log.ControlsState.AlertSize
AlertStatus = log.ControlsState.AlertStatus
VisualAlert = car.CarControl.HUDControl.VisualAlert
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
EventName = car.CarEvent.EventName
# Alert priorities
class Priority(IntEnum):
LOWEST = 0
LOWER = 1
LOW = 2
MID = 3
HIGH = 4
HIGHEST = 5
# Event types
class ET:
ENABLE = 'enable'
PRE_ENABLE = 'preEnable'
OVERRIDE_LATERAL = 'overrideLateral'
OVERRIDE_LONGITUDINAL = 'overrideLongitudinal'
NO_ENTRY = 'noEntry'
WARNING = 'warning'
USER_DISABLE = 'userDisable'
SOFT_DISABLE = 'softDisable'
IMMEDIATE_DISABLE = 'immediateDisable'
PERMANENT = 'permanent'
# get event name from enum
EVENT_NAME = {v: k for k, v in EventName.schema.enumerants.items()}
class Events:
def __init__(self):
self.events: List[int] = []
self.static_events: List[int] = []
self.events_prev = dict.fromkeys(EVENTS.keys(), 0)
@property
def names(self) -> List[int]:
return self.events
def __len__(self) -> int:
return len(self.events)
def add(self, event_name: int, static: bool=False) -> None:
if static:
self.static_events.append(event_name)
self.events.append(event_name)
def clear(self) -> None:
self.events_prev = {k: (v + 1 if k in self.events else 0) for k, v in self.events_prev.items()}
self.events = self.static_events.copy()
def contains(self, event_type: str) -> bool:
return any(event_type in EVENTS.get(e, {}) for e in self.events)
def create_alerts(self, event_types: List[str], callback_args=None):
if callback_args is None:
callback_args = []
ret = []
for e in self.events:
types = EVENTS[e].keys()
for et in event_types:
if et in types:
alert = EVENTS[e][et]
if not isinstance(alert, Alert):
alert = alert(*callback_args)
if DT_CTRL * (self.events_prev[e] + 1) >= alert.creation_delay:
alert.alert_type = f"{EVENT_NAME[e]}/{et}"
alert.event_type = et
ret.append(alert)
return ret
def add_from_msg(self, events):
for e in events:
self.events.append(e.name.raw)
def to_msg(self):
ret = []
for event_name in self.events:
event = car.CarEvent.new_message()
event.name = event_name
for event_type in EVENTS.get(event_name, {}):
setattr(event, event_type, True)
ret.append(event)
return ret
class Alert:
def __init__(self,
alert_text_1: str,
alert_text_2: str,
alert_status: log.ControlsState.AlertStatus,
alert_size: log.ControlsState.AlertSize,
priority: Priority,
visual_alert: car.CarControl.HUDControl.VisualAlert,
audible_alert: car.CarControl.HUDControl.AudibleAlert,
duration: float,
alert_rate: float = 0.,
creation_delay: float = 0.):
self.alert_text_1 = alert_text_1
self.alert_text_2 = alert_text_2
self.alert_status = alert_status
self.alert_size = alert_size
self.priority = priority
self.visual_alert = visual_alert
self.audible_alert = audible_alert
self.duration = int(duration / DT_CTRL)
self.alert_rate = alert_rate
self.creation_delay = creation_delay
self.alert_type = ""
self.event_type: Optional[str] = None
def __str__(self) -> str:
return f"{self.alert_text_1}/{self.alert_text_2} {self.priority} {self.visual_alert} {self.audible_alert}"
def __gt__(self, alert2) -> bool:
if not isinstance(alert2, Alert):
return False
return self.priority > alert2.priority
class NoEntryAlert(Alert):
def __init__(self, alert_text_2: str,
alert_text_1: str = "openpilot Unavailable",
visual_alert: car.CarControl.HUDControl.VisualAlert=VisualAlert.none):
super().__init__(alert_text_1, alert_text_2, AlertStatus.normal,
AlertSize.mid, Priority.LOW, visual_alert,
AudibleAlert.refuse, 3.)
class SoftDisableAlert(Alert):
def __init__(self, alert_text_2: str):
super().__init__("TAKE CONTROL IMMEDIATELY", alert_text_2,
AlertStatus.userPrompt, AlertSize.full,
Priority.MID, VisualAlert.steerRequired,
AudibleAlert.warningSoft, 2.),
# less harsh version of SoftDisable, where the condition is user-triggered
class UserSoftDisableAlert(SoftDisableAlert):
def __init__(self, alert_text_2: str):
super().__init__(alert_text_2),
self.alert_text_1 = "openpilot will disengage"
class ImmediateDisableAlert(Alert):
def __init__(self, alert_text_2: str):
super().__init__("TAKE CONTROL IMMEDIATELY", alert_text_2,
AlertStatus.critical, AlertSize.full,
Priority.HIGHEST, VisualAlert.steerRequired,
AudibleAlert.warningImmediate, 4.),
class EngagementAlert(Alert):
def __init__(self, audible_alert: car.CarControl.HUDControl.AudibleAlert):
super().__init__("", "",
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none,
audible_alert, .2),
class NormalPermanentAlert(Alert):
def __init__(self, alert_text_1: str, alert_text_2: str = "", duration: float = 0.2, priority: Priority = Priority.LOWER, creation_delay: float = 0.):
super().__init__(alert_text_1, alert_text_2,
AlertStatus.normal, AlertSize.mid if len(alert_text_2) else AlertSize.small,
priority, VisualAlert.none, AudibleAlert.none, duration, creation_delay=creation_delay),
class StartupAlert(Alert):
def __init__(self, alert_text_1: str, alert_text_2: str = "Always keep hands on wheel and eyes on road", alert_status=AlertStatus.normal):
super().__init__(alert_text_1, alert_text_2,
alert_status, AlertSize.mid,
Priority.LOWER, VisualAlert.none, AudibleAlert.none, 5.),
# ********** helper functions **********
def get_display_speed(speed_ms: float, metric: bool) -> str:
speed = int(round(speed_ms * (CV.MS_TO_KPH if metric else CV.MS_TO_MPH)))
unit = 'km/h' if metric else 'mph'
return f"{speed} {unit}"
# ********** alert callback functions **********
AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int], Alert]
def soft_disable_alert(alert_text_2: str) -> AlertCallbackType:
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
if soft_disable_time < int(0.5 / DT_CTRL):
return ImmediateDisableAlert(alert_text_2)
return SoftDisableAlert(alert_text_2)
return func
def user_soft_disable_alert(alert_text_2: str) -> AlertCallbackType:
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
if soft_disable_time < int(0.5 / DT_CTRL):
return ImmediateDisableAlert(alert_text_2)
return UserSoftDisableAlert(alert_text_2)
return func
def startup_master_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
branch = get_short_branch("") # Ensure get_short_branch is cached to avoid lags on startup
if "REPLAY" in os.environ:
branch = "replay"
return StartupAlert("WARNING: This branch is not tested", branch, alert_status=AlertStatus.userPrompt)
def below_engage_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
return NoEntryAlert(f"Drive above {get_display_speed(CP.minEnableSpeed, metric)} to engage")
def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
return Alert(
f"Steer Unavailable Below {get_display_speed(CP.minSteerSpeed, metric)}",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.LOW, VisualAlert.steerRequired, AudibleAlert.prompt, 0.4)
def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
first_word = 'Recalibration' if sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.recalibrating else 'Calibration'
return Alert(
f"{first_word} in Progress: {sm['liveCalibration'].calPerc:.0f}%",
f"Drive Above {get_display_speed(MIN_SPEED_FILTER, metric)}",
AlertStatus.normal, AlertSize.mid,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .2)
def no_gps_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
return Alert(
"Poor GPS reception",
"Hardware malfunctioning if sky is visible",
AlertStatus.normal, AlertSize.mid,
Priority.LOWER, VisualAlert.none, AudibleAlert.none, .2, creation_delay=300.)
# *** debug alerts ***
def out_of_space_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
full_perc = round(100. - sm['deviceState'].freeSpacePercent)
return NormalPermanentAlert("Out of Storage", f"{full_perc}% full")
def posenet_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
mdl = sm['modelV2'].velocity.x[0] if len(sm['modelV2'].velocity.x) else math.nan
err = CS.vEgo - mdl
msg = f"Speed Error: {err:.1f} m/s"
return NoEntryAlert(msg, alert_text_1="Posenet Speed Invalid")
def process_not_running_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
not_running = [p.name for p in sm['managerState'].processes if not p.running and p.shouldBeRunning]
msg = ', '.join(not_running)
return NoEntryAlert(msg, alert_text_1="Process Not Running")
def comm_issue_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
bs = [s for s in sm.data.keys() if not sm.all_checks([s, ])]
msg = ', '.join(bs[:4]) # can't fit too many on one line
return NoEntryAlert(msg, alert_text_1="Communication Issue Between Processes")
def camera_malfunction_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
all_cams = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState')
bad_cams = [s.replace('State', '') for s in all_cams if s in sm.data.keys() and not sm.all_checks([s, ])]
return NormalPermanentAlert("Camera Malfunction", ', '.join(bad_cams))
def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
rpy = sm['liveCalibration'].rpyCalib
yaw = math.degrees(rpy[2] if len(rpy) == 3 else math.nan)
pitch = math.degrees(rpy[1] if len(rpy) == 3 else math.nan)
angles = f"Remount Device (Pitch: {pitch:.1f}°, Yaw: {yaw:.1f}°)"
return NormalPermanentAlert("Calibration Invalid", angles)
def overheat_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
cpu = max(sm['deviceState'].cpuTempC, default=0.)
gpu = max(sm['deviceState'].gpuTempC, default=0.)
temp = max((cpu, gpu, sm['deviceState'].memoryTempC))
return NormalPermanentAlert("System Overheated", f"{temp:.0f} °C")
def low_memory_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
return NormalPermanentAlert("Low Memory", f"{sm['deviceState'].memoryUsagePercent}% used")
def high_cpu_usage_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
x = max(sm['deviceState'].cpuUsagePercent, default=0.)
return NormalPermanentAlert("High CPU Usage", f"{x}% used")
def modeld_lagging_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
return NormalPermanentAlert("Driving Model Lagging", f"{sm['modelV2'].frameDropPerc:.1f}% frames dropped")
def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
text = "Enable Adaptive Cruise to Engage"
if CP.carName == "honda":
text = "Enable Main Switch to Engage"
return NoEntryAlert(text)
def joystick_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
axes = sm['testJoystick'].axes
gb, steer = list(axes)[:2] if len(axes) else (0., 0.)
vals = f"Gas: {round(gb * 100.)}%, Steer: {round(steer * 100.)}%"
return NormalPermanentAlert("Joystick Mode", vals)
EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = {
# ********** events with no alerts **********
EventName.stockFcw: {},
# ********** events only containing alerts displayed in all states **********
EventName.joystickDebug: {
ET.WARNING: joystick_alert,
ET.PERMANENT: NormalPermanentAlert("Joystick Mode"),
},
EventName.controlsInitializing: {
ET.NO_ENTRY: NoEntryAlert("System Initializing"),
},
EventName.startup: {
ET.PERMANENT: StartupAlert("Be ready to take over at any time")
},
EventName.startupMaster: {
ET.PERMANENT: startup_master_alert,
},
# Car is recognized, but marked as dashcam only
EventName.startupNoControl: {
ET.PERMANENT: StartupAlert("Dashcam mode"),
ET.NO_ENTRY: NoEntryAlert("Dashcam mode"),
},
# Car is not recognized
EventName.startupNoCar: {
ET.PERMANENT: StartupAlert("Dashcam mode for unsupported car"),
},
EventName.startupNoFw: {
ET.PERMANENT: StartupAlert("Car Unrecognized",
"Check comma power connections",
alert_status=AlertStatus.userPrompt),
},
EventName.dashcamMode: {
ET.PERMANENT: NormalPermanentAlert("Dashcam Mode",
priority=Priority.LOWEST),
},
EventName.invalidLkasSetting: {
ET.PERMANENT: NormalPermanentAlert("Stock LKAS is on",
"Turn off stock LKAS to engage"),
},
EventName.cruiseMismatch: {
#ET.PERMANENT: ImmediateDisableAlert("openpilot failed to cancel cruise"),
},
# openpilot doesn't recognize the car. This switches openpilot into a
# read-only mode. This can be solved by adding your fingerprint.
# See https://github.com/commaai/openpilot/wiki/Fingerprinting for more information
EventName.carUnrecognized: {
ET.PERMANENT: NormalPermanentAlert("Dashcam Mode",
"Car Unrecognized",
priority=Priority.LOWEST),
},
EventName.stockAeb: {
ET.PERMANENT: Alert(
"BRAKE!",
"Stock AEB: Risk of Collision",
AlertStatus.critical, AlertSize.full,
Priority.HIGHEST, VisualAlert.fcw, AudibleAlert.none, 2.),
ET.NO_ENTRY: NoEntryAlert("Stock AEB: Risk of Collision"),
},
EventName.fcw: {
ET.PERMANENT: Alert(
"BRAKE!",
"Risk of Collision",
AlertStatus.critical, AlertSize.full,
Priority.HIGHEST, VisualAlert.fcw, AudibleAlert.warningSoft, 2.),
},
EventName.ldw: {
ET.PERMANENT: Alert(
"Lane Departure Detected",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.LOW, VisualAlert.ldw, AudibleAlert.prompt, 3.),
},
# ********** events only containing alerts that display while engaged **********
EventName.steerTempUnavailableSilent: {
ET.WARNING: Alert(
"Steering Temporarily Unavailable",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.LOW, VisualAlert.steerRequired, AudibleAlert.prompt, 1.8),
},
EventName.preDriverDistracted: {
ET.WARNING: Alert(
"Pay Attention",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, .1),
},
EventName.promptDriverDistracted: {
ET.WARNING: Alert(
"Pay Attention",
"Driver Distracted",
AlertStatus.userPrompt, AlertSize.mid,
Priority.MID, VisualAlert.steerRequired, AudibleAlert.promptDistracted, .1),
},
EventName.driverDistracted: {
ET.WARNING: Alert(
"DISENGAGE IMMEDIATELY",
"Driver Distracted",
AlertStatus.critical, AlertSize.full,
Priority.HIGH, VisualAlert.steerRequired, AudibleAlert.warningImmediate, .1),
},
EventName.preDriverUnresponsive: {
ET.WARNING: Alert(
"Touch Steering Wheel: No Face Detected",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.steerRequired, AudibleAlert.none, .1, alert_rate=0.75),
},
EventName.promptDriverUnresponsive: {
ET.WARNING: Alert(
"Touch Steering Wheel",
"Driver Unresponsive",
AlertStatus.userPrompt, AlertSize.mid,
Priority.MID, VisualAlert.steerRequired, AudibleAlert.promptDistracted, .1),
},
EventName.driverUnresponsive: {
ET.WARNING: Alert(
"DISENGAGE IMMEDIATELY",
"Driver Unresponsive",
AlertStatus.critical, AlertSize.full,
Priority.HIGH, VisualAlert.steerRequired, AudibleAlert.warningImmediate, .1),
},
EventName.manualRestart: {
ET.WARNING: Alert(
"TAKE CONTROL",
"Resume Driving Manually",
AlertStatus.userPrompt, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.none, .2),
},
EventName.resumeRequired: {
ET.WARNING: Alert(
"Press Resume to Exit Standstill",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.MID, VisualAlert.none, AudibleAlert.none, .2),
},
EventName.belowSteerSpeed: {
ET.WARNING: below_steer_speed_alert,
},
EventName.preLaneChangeLeft: {
ET.WARNING: Alert(
"Steer Left to Start Lane Change Once Safe",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, .1, alert_rate=0.75),
},
EventName.preLaneChangeRight: {
ET.WARNING: Alert(
"Steer Right to Start Lane Change Once Safe",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, .1, alert_rate=0.75),
},
EventName.laneChangeBlocked: {
ET.WARNING: Alert(
"Car Detected in Blindspot",
"",
AlertStatus.userPrompt, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.prompt, .1),
},
EventName.laneChange: {
ET.WARNING: Alert(
"Changing Lanes",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, .1),
},
EventName.steerSaturated: {
ET.WARNING: Alert(
"Take Control",
"Turn Exceeds Steering Limit",
AlertStatus.userPrompt, AlertSize.mid,
Priority.LOW, VisualAlert.steerRequired, AudibleAlert.promptRepeat, 2.),
},
# Thrown when the fan is driven at >50% but is not rotating
EventName.fanMalfunction: {
ET.PERMANENT: NormalPermanentAlert("Fan Malfunction", "Likely Hardware Issue"),
},
# Camera is not outputting frames
EventName.cameraMalfunction: {
ET.PERMANENT: camera_malfunction_alert,
ET.SOFT_DISABLE: soft_disable_alert("Camera Malfunction"),
ET.NO_ENTRY: NoEntryAlert("Camera Malfunction: Reboot Your Device"),
},
# Camera framerate too low
EventName.cameraFrameRate: {
ET.PERMANENT: NormalPermanentAlert("Camera Frame Rate Low", "Reboot your Device"),
ET.SOFT_DISABLE: soft_disable_alert("Camera Frame Rate Low"),
ET.NO_ENTRY: NoEntryAlert("Camera Frame Rate Low: Reboot Your Device"),
},
# Unused
EventName.gpsMalfunction: {
ET.PERMANENT: NormalPermanentAlert("GPS Malfunction", "Likely Hardware Issue"),
},
EventName.locationdTemporaryError: {
ET.NO_ENTRY: NoEntryAlert("locationd Temporary Error"),
ET.SOFT_DISABLE: soft_disable_alert("locationd Temporary Error"),
},
EventName.locationdPermanentError: {
ET.NO_ENTRY: NoEntryAlert("locationd Permanent Error"),
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("locationd Permanent Error"),
ET.PERMANENT: NormalPermanentAlert("locationd Permanent Error"),
},
# openpilot tries to learn certain parameters about your car by observing
# how the car behaves to steering inputs from both human and openpilot driving.
# This includes:
# - steer ratio: gear ratio of the steering rack. Steering angle divided by tire angle
# - tire stiffness: how much grip your tires have
# - angle offset: most steering angle sensors are offset and measure a non zero angle when driving straight
# This alert is thrown when any of these values exceed a sanity check. This can be caused by
# bad alignment or bad sensor data. If this happens consistently consider creating an issue on GitHub
EventName.paramsdTemporaryError: {
ET.NO_ENTRY: NoEntryAlert("paramsd Temporary Error"),
ET.SOFT_DISABLE: soft_disable_alert("paramsd Temporary Error"),
},
EventName.paramsdPermanentError: {
ET.NO_ENTRY: NoEntryAlert("paramsd Permanent Error"),
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("paramsd Permanent Error"),
ET.PERMANENT: NormalPermanentAlert("paramsd Permanent Error"),
},
# ********** events that affect controls state transitions **********
EventName.pcmEnable: {
ET.ENABLE: EngagementAlert(AudibleAlert.engage),
},
EventName.buttonEnable: {
ET.ENABLE: EngagementAlert(AudibleAlert.engage),
},
EventName.pcmDisable: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
},
EventName.buttonCancel: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
ET.NO_ENTRY: NoEntryAlert("Cancel Pressed"),
},
EventName.brakeHold: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
ET.NO_ENTRY: NoEntryAlert("Brake Hold Active"),
},
EventName.parkBrake: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
ET.NO_ENTRY: NoEntryAlert("Parking Brake Engaged"),
},
EventName.pedalPressed: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
ET.NO_ENTRY: NoEntryAlert("Pedal Pressed",
visual_alert=VisualAlert.brakePressed),
},
EventName.preEnableStandstill: {
ET.PRE_ENABLE: Alert(
"Release Brake to Engage",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .1, creation_delay=1.),
},
EventName.gasPressedOverride: {
ET.OVERRIDE_LONGITUDINAL: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .1),
},
EventName.steerOverride: {
ET.OVERRIDE_LATERAL: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .1),
},
EventName.wrongCarMode: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
ET.NO_ENTRY: wrong_car_mode_alert,
},
EventName.resumeBlocked: {
ET.NO_ENTRY: NoEntryAlert("Press Set to Engage"),
},
EventName.wrongCruiseMode: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
ET.NO_ENTRY: NoEntryAlert("Adaptive Cruise Disabled"),
},
EventName.steerTempUnavailable: {
ET.SOFT_DISABLE: soft_disable_alert("Steering Temporarily Unavailable"),
ET.NO_ENTRY: NoEntryAlert("Steering Temporarily Unavailable"),
},
EventName.steerTimeLimit: {
ET.SOFT_DISABLE: soft_disable_alert("Vehicle Steering Time Limit"),
ET.NO_ENTRY: NoEntryAlert("Vehicle Steering Time Limit"),
},
EventName.outOfSpace: {
ET.PERMANENT: out_of_space_alert,
ET.NO_ENTRY: NoEntryAlert("Out of Storage"),
},
EventName.belowEngageSpeed: {
ET.NO_ENTRY: below_engage_speed_alert,
},
EventName.sensorDataInvalid: {
ET.PERMANENT: Alert(
"Sensor Data Invalid",
"Possible Hardware Issue",
AlertStatus.normal, AlertSize.mid,
Priority.LOWER, VisualAlert.none, AudibleAlert.none, .2, creation_delay=1.),
ET.NO_ENTRY: NoEntryAlert("Sensor Data Invalid"),
ET.SOFT_DISABLE: soft_disable_alert("Sensor Data Invalid"),
},
EventName.noGps: {
ET.PERMANENT: no_gps_alert,
},
EventName.soundsUnavailable: {
ET.PERMANENT: NormalPermanentAlert("Speaker not found", "Reboot your Device"),
ET.NO_ENTRY: NoEntryAlert("Speaker not found"),
},
EventName.tooDistracted: {
ET.NO_ENTRY: NoEntryAlert("Distraction Level Too High"),
},
EventName.overheat: {
ET.PERMANENT: overheat_alert,
ET.SOFT_DISABLE: soft_disable_alert("System Overheated"),
ET.NO_ENTRY: NoEntryAlert("System Overheated"),
},
EventName.wrongGear: {
ET.SOFT_DISABLE: user_soft_disable_alert("Gear not D"),
ET.NO_ENTRY: NoEntryAlert("Gear not D"),
},
# This alert is thrown when the calibration angles are outside of the acceptable range.
# For example if the device is pointed too much to the left or the right.
# Usually this can only be solved by removing the mount from the windshield completely,
# and attaching while making sure the device is pointed straight forward and is level.
# See https://comma.ai/setup for more information
EventName.calibrationInvalid: {
ET.PERMANENT: calibration_invalid_alert,
ET.SOFT_DISABLE: soft_disable_alert("Calibration Invalid: Remount Device & Recalibrate"),
ET.NO_ENTRY: NoEntryAlert("Calibration Invalid: Remount Device & Recalibrate"),
},
EventName.calibrationIncomplete: {
ET.PERMANENT: calibration_incomplete_alert,
ET.SOFT_DISABLE: soft_disable_alert("Calibration Incomplete"),
ET.NO_ENTRY: NoEntryAlert("Calibration in Progress"),
},
EventName.calibrationRecalibrating: {
ET.PERMANENT: calibration_incomplete_alert,
ET.SOFT_DISABLE: soft_disable_alert("Device Remount Detected: Recalibrating"),
ET.NO_ENTRY: NoEntryAlert("Remount Detected: Recalibrating"),
},
EventName.doorOpen: {
ET.SOFT_DISABLE: user_soft_disable_alert("Door Open"),
ET.NO_ENTRY: NoEntryAlert("Door Open"),
},
EventName.seatbeltNotLatched: {
ET.SOFT_DISABLE: user_soft_disable_alert("Seatbelt Unlatched"),
ET.NO_ENTRY: NoEntryAlert("Seatbelt Unlatched"),
},
EventName.espDisabled: {
ET.SOFT_DISABLE: soft_disable_alert("Electronic Stability Control Disabled"),
ET.NO_ENTRY: NoEntryAlert("Electronic Stability Control Disabled"),
},
EventName.lowBattery: {
ET.SOFT_DISABLE: soft_disable_alert("Low Battery"),
ET.NO_ENTRY: NoEntryAlert("Low Battery"),
},
# Different openpilot services communicate between each other at a certain
# interval. If communication does not follow the regular schedule this alert
# is thrown. This can mean a service crashed, did not broadcast a message for
# ten times the regular interval, or the average interval is more than 10% too high.
EventName.commIssue: {
ET.SOFT_DISABLE: soft_disable_alert("Communication Issue between Processes"),
ET.NO_ENTRY: comm_issue_alert,
},
EventName.commIssueAvgFreq: {
ET.SOFT_DISABLE: soft_disable_alert("Low Communication Rate between Processes"),
ET.NO_ENTRY: NoEntryAlert("Low Communication Rate between Processes"),
},
EventName.controlsdLagging: {
ET.SOFT_DISABLE: soft_disable_alert("Controls Lagging"),
ET.NO_ENTRY: NoEntryAlert("Controls Process Lagging: Reboot Your Device"),
},
# Thrown when manager detects a service exited unexpectedly while driving
EventName.processNotRunning: {
ET.NO_ENTRY: process_not_running_alert,
ET.SOFT_DISABLE: soft_disable_alert("Process Not Running"),
},
EventName.radarFault: {
ET.SOFT_DISABLE: soft_disable_alert("Radar Error: Restart the Car"),
ET.NO_ENTRY: NoEntryAlert("Radar Error: Restart the Car"),
},
# Every frame from the camera should be processed by the model. If modeld
# is not processing frames fast enough they have to be dropped. This alert is
# thrown when over 20% of frames are dropped.
EventName.modeldLagging: {
ET.SOFT_DISABLE: soft_disable_alert("Driving Model Lagging"),
ET.NO_ENTRY: NoEntryAlert("Driving Model Lagging"),
ET.PERMANENT: modeld_lagging_alert,
},
# Besides predicting the path, lane lines and lead car data the model also
# predicts the current velocity and rotation speed of the car. If the model is
# very uncertain about the current velocity while the car is moving, this
# usually means the model has trouble understanding the scene. This is used
# as a heuristic to warn the driver.
EventName.posenetInvalid: {
ET.SOFT_DISABLE: soft_disable_alert("Posenet Speed Invalid"),
ET.NO_ENTRY: posenet_invalid_alert,
},
# When the localizer detects an acceleration of more than 40 m/s^2 (~4G) we
# alert the driver the device might have fallen from the windshield.
EventName.deviceFalling: {
ET.SOFT_DISABLE: soft_disable_alert("Device Fell Off Mount"),
ET.NO_ENTRY: NoEntryAlert("Device Fell Off Mount"),
},
EventName.lowMemory: {
ET.SOFT_DISABLE: soft_disable_alert("Low Memory: Reboot Your Device"),
ET.PERMANENT: low_memory_alert,
ET.NO_ENTRY: NoEntryAlert("Low Memory: Reboot Your Device"),
},
EventName.highCpuUsage: {
#ET.SOFT_DISABLE: soft_disable_alert("System Malfunction: Reboot Your Device"),
#ET.PERMANENT: NormalPermanentAlert("System Malfunction", "Reboot your Device"),
ET.NO_ENTRY: high_cpu_usage_alert,
},
EventName.accFaulted: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Cruise Fault: Restart the Car"),
ET.PERMANENT: NormalPermanentAlert("Cruise Fault: Restart the car to engage"),
ET.NO_ENTRY: NoEntryAlert("Cruise Fault: Restart the Car"),
},
EventName.controlsMismatch: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Controls Mismatch"),
ET.NO_ENTRY: NoEntryAlert("Controls Mismatch"),
},
EventName.roadCameraError: {
ET.PERMANENT: NormalPermanentAlert("Camera CRC Error - Road",
duration=1.,
creation_delay=30.),
},
EventName.wideRoadCameraError: {
ET.PERMANENT: NormalPermanentAlert("Camera CRC Error - Road Fisheye",
duration=1.,
creation_delay=30.),
},
EventName.driverCameraError: {
ET.PERMANENT: NormalPermanentAlert("Camera CRC Error - Driver",
duration=1.,
creation_delay=30.),
},
# Sometimes the USB stack on the device can get into a bad state
# causing the connection to the panda to be lost
EventName.usbError: {
ET.SOFT_DISABLE: soft_disable_alert("USB Error: Reboot Your Device"),
ET.PERMANENT: NormalPermanentAlert("USB Error: Reboot Your Device", ""),
ET.NO_ENTRY: NoEntryAlert("USB Error: Reboot Your Device"),
},
# This alert can be thrown for the following reasons:
# - No CAN data received at all
# - CAN data is received, but some message are not received at the right frequency
# If you're not writing a new car port, this is usually cause by faulty wiring
EventName.canError: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("CAN Error"),
ET.PERMANENT: Alert(
"CAN Error: Check Connections",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 1., creation_delay=1.),
ET.NO_ENTRY: NoEntryAlert("CAN Error: Check Connections"),
},
EventName.canBusMissing: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("CAN Bus Disconnected"),
ET.PERMANENT: Alert(
"CAN Bus Disconnected: Likely Faulty Cable",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 1., creation_delay=1.),
ET.NO_ENTRY: NoEntryAlert("CAN Bus Disconnected: Check Connections"),
},
EventName.steerUnavailable: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("LKAS Fault: Restart the Car"),
ET.PERMANENT: NormalPermanentAlert("LKAS Fault: Restart the car to engage"),
ET.NO_ENTRY: NoEntryAlert("LKAS Fault: Restart the Car"),
},
EventName.reverseGear: {
ET.PERMANENT: Alert(
"Reverse\nGear",
"",
AlertStatus.normal, AlertSize.full,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .2, creation_delay=0.5),
ET.USER_DISABLE: ImmediateDisableAlert("Reverse Gear"),
ET.NO_ENTRY: NoEntryAlert("Reverse Gear"),
},
# On cars that use stock ACC the car can decide to cancel ACC for various reasons.
# When this happens we can no long control the car so the user needs to be warned immediately.
EventName.cruiseDisabled: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Cruise Is Off"),
},
# For planning the trajectory Model Predictive Control (MPC) is used. This is
# an optimization algorithm that is not guaranteed to find a feasible solution.
# If no solution is found or the solution has a very high cost this alert is thrown.
EventName.plannerError: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Planner Solution Error"),
ET.NO_ENTRY: NoEntryAlert("Planner Solution Error"),
},
# When the relay in the harness box opens the CAN bus between the LKAS camera
# and the rest of the car is separated. When messages from the LKAS camera
# are received on the car side this usually means the relay hasn't opened correctly
# and this alert is thrown.
EventName.relayMalfunction: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Harness Relay Malfunction"),
ET.PERMANENT: NormalPermanentAlert("Harness Relay Malfunction", "Check Hardware"),
ET.NO_ENTRY: NoEntryAlert("Harness Relay Malfunction"),
},
EventName.speedTooLow: {
ET.IMMEDIATE_DISABLE: Alert(
"openpilot Canceled",
"Speed too low",
AlertStatus.normal, AlertSize.mid,
Priority.HIGH, VisualAlert.none, AudibleAlert.disengage, 3.),
},
# When the car is driving faster than most cars in the training data, the model outputs can be unpredictable.
EventName.speedTooHigh: {
ET.WARNING: Alert(
"Speed Too High",
"Model uncertain at this speed",
AlertStatus.userPrompt, AlertSize.mid,
Priority.HIGH, VisualAlert.steerRequired, AudibleAlert.promptRepeat, 4.),
ET.NO_ENTRY: NoEntryAlert("Slow down to engage"),
},
EventName.lowSpeedLockout: {
ET.PERMANENT: NormalPermanentAlert("Cruise Fault: Restart the car to engage"),
ET.NO_ENTRY: NoEntryAlert("Cruise Fault: Restart the Car"),
},
EventName.lkasDisabled: {
ET.PERMANENT: NormalPermanentAlert("LKAS Disabled: Enable LKAS to engage"),
ET.NO_ENTRY: NoEntryAlert("LKAS Disabled"),
},
EventName.vehicleSensorsInvalid: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Vehicle Sensors Invalid"),
ET.PERMANENT: NormalPermanentAlert("Vehicle Sensors Calibrating", "Drive to Calibrate"),
ET.NO_ENTRY: NoEntryAlert("Vehicle Sensors Calibrating"),
},
}
if __name__ == '__main__':
# print all alerts by type and priority
from cereal.services import SERVICE_LIST
from collections import defaultdict
event_names = {v: k for k, v in EventName.schema.enumerants.items()}
alerts_by_type: Dict[str, Dict[Priority, List[str]]] = defaultdict(lambda: defaultdict(list))
CP = car.CarParams.new_message()
CS = car.CarState.new_message()
sm = messaging.SubMaster(list(SERVICE_LIST.keys()))
for i, alerts in EVENTS.items():
for et, alert in alerts.items():
if callable(alert):
alert = alert(CP, CS, sm, False, 1)
alerts_by_type[et][alert.priority].append(event_names[i])
all_alerts: Dict[str, List[tuple[Priority, List[str]]]] = {}
for et, priority_alerts in alerts_by_type.items():
all_alerts[et] = sorted(priority_alerts.items(), key=lambda x: x[0], reverse=True)
for status, evs in sorted(all_alerts.items(), key=lambda x: x[0]):
print(f"**** {status} ****")
for p, alert_list in evs:
print(f" {repr(p)}:")
print(" ", ', '.join(alert_list), "\n")

View File

@@ -0,0 +1,32 @@
from abc import abstractmethod, ABC
from openpilot.common.numpy_fast import clip
from openpilot.common.realtime import DT_CTRL
MIN_LATERAL_CONTROL_SPEED = 0.3 # m/s
class LatControl(ABC):
def __init__(self, CP, CI):
self.sat_count_rate = 1.0 * DT_CTRL
self.sat_limit = CP.steerLimitTimer
self.sat_count = 0.
self.sat_check_min_speed = 10.
# we define the steer torque scale as [-1.0...1.0]
self.steer_max = 1.0
@abstractmethod
def update(self, active, CS, VM, params, steer_limited, desired_curvature, desired_curvature_rate, llk):
pass
def reset(self):
self.sat_count = 0.
def _check_saturation(self, saturated, CS, steer_limited):
if saturated and CS.vEgo > self.sat_check_min_speed and not steer_limited and not CS.steeringPressed:
self.sat_count += self.sat_count_rate
else:
self.sat_count -= self.sat_count_rate
self.sat_count = clip(self.sat_count, 0.0, self.sat_limit)
return self.sat_count > (self.sat_limit - 1e-3)

View File

@@ -0,0 +1,29 @@
import math
from cereal import log
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
STEER_ANGLE_SATURATION_THRESHOLD = 2.5 # Degrees
class LatControlAngle(LatControl):
def __init__(self, CP, CI):
super().__init__(CP, CI)
self.sat_check_min_speed = 5.
def update(self, active, CS, VM, params, steer_limited, desired_curvature, desired_curvature_rate, llk):
angle_log = log.ControlsState.LateralAngleState.new_message()
if not active:
angle_log.active = False
angle_steers_des = float(CS.steeringAngleDeg)
else:
angle_log.active = True
angle_steers_des = math.degrees(VM.get_steer_from_curvature(-desired_curvature, CS.vEgo, params.roll))
angle_steers_des += params.angleOffsetDeg
angle_control_saturated = abs(angle_steers_des - CS.steeringAngleDeg) > STEER_ANGLE_SATURATION_THRESHOLD
angle_log.saturated = self._check_saturation(angle_control_saturated, CS, False)
angle_log.steeringAngleDeg = float(CS.steeringAngleDeg)
angle_log.steeringAngleDesiredDeg = angle_steers_des
return 0, float(angle_steers_des), angle_log

View File

@@ -0,0 +1,48 @@
import math
from cereal import log
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
from openpilot.selfdrive.controls.lib.pid import PIDController
class LatControlPID(LatControl):
def __init__(self, CP, CI):
super().__init__(CP, CI)
self.pid = PIDController((CP.lateralTuning.pid.kpBP, CP.lateralTuning.pid.kpV),
(CP.lateralTuning.pid.kiBP, CP.lateralTuning.pid.kiV),
k_f=CP.lateralTuning.pid.kf, pos_limit=self.steer_max, neg_limit=-self.steer_max)
self.get_steer_feedforward = CI.get_steer_feedforward_function()
def reset(self):
super().reset()
self.pid.reset()
def update(self, active, CS, VM, params, steer_limited, desired_curvature, desired_curvature_rate, llk):
pid_log = log.ControlsState.LateralPIDState.new_message()
pid_log.steeringAngleDeg = float(CS.steeringAngleDeg)
pid_log.steeringRateDeg = float(CS.steeringRateDeg)
angle_steers_des_no_offset = math.degrees(VM.get_steer_from_curvature(-desired_curvature, CS.vEgo, params.roll))
angle_steers_des = angle_steers_des_no_offset + params.angleOffsetDeg
error = angle_steers_des - CS.steeringAngleDeg
pid_log.steeringAngleDesiredDeg = angle_steers_des
pid_log.angleError = error
if not active:
output_steer = 0.0
pid_log.active = False
self.pid.reset()
else:
# offset does not contribute to resistive torque
steer_feedforward = self.get_steer_feedforward(angle_steers_des_no_offset, CS.vEgo)
output_steer = self.pid.update(error, override=CS.steeringPressed,
feedforward=steer_feedforward, speed=CS.vEgo)
pid_log.active = True
pid_log.p = self.pid.p
pid_log.i = self.pid.i
pid_log.f = self.pid.f
pid_log.output = output_steer
pid_log.saturated = self._check_saturation(self.steer_max - abs(output_steer) < 1e-3, CS, steer_limited)
return output_steer, angle_steers_des, pid_log

View File

@@ -0,0 +1,91 @@
import math
from cereal import log
from openpilot.common.numpy_fast import interp
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
from openpilot.selfdrive.controls.lib.pid import PIDController
from openpilot.selfdrive.controls.lib.vehicle_model import ACCELERATION_DUE_TO_GRAVITY
# At higher speeds (25+mph) we can assume:
# Lateral acceleration achieved by a specific car correlates to
# torque applied to the steering rack. It does not correlate to
# wheel slip, or to speed.
# This controller applies torque to achieve desired lateral
# accelerations. To compensate for the low speed effects we
# use a LOW_SPEED_FACTOR in the error. Additionally, there is
# friction in the steering wheel that needs to be overcome to
# move it at all, this is compensated for too.
LOW_SPEED_X = [0, 10, 20, 30]
LOW_SPEED_Y = [15, 13, 10, 5]
class LatControlTorque(LatControl):
def __init__(self, CP, CI):
super().__init__(CP, CI)
self.torque_params = CP.lateralTuning.torque
self.pid = PIDController(self.torque_params.kp, self.torque_params.ki,
k_f=self.torque_params.kf, pos_limit=self.steer_max, neg_limit=-self.steer_max)
self.torque_from_lateral_accel = CI.torque_from_lateral_accel()
self.use_steering_angle = self.torque_params.useSteeringAngle
self.steering_angle_deadzone_deg = self.torque_params.steeringAngleDeadzoneDeg
def update_live_torque_params(self, latAccelFactor, latAccelOffset, friction):
self.torque_params.latAccelFactor = latAccelFactor
self.torque_params.latAccelOffset = latAccelOffset
self.torque_params.friction = friction
def update(self, active, CS, VM, params, steer_limited, desired_curvature, desired_curvature_rate, llk):
pid_log = log.ControlsState.LateralTorqueState.new_message()
if not active:
output_torque = 0.0
pid_log.active = False
else:
if self.use_steering_angle:
actual_curvature = -VM.calc_curvature(math.radians(CS.steeringAngleDeg - params.angleOffsetDeg), CS.vEgo, params.roll)
curvature_deadzone = abs(VM.calc_curvature(math.radians(self.steering_angle_deadzone_deg), CS.vEgo, 0.0))
else:
actual_curvature_vm = -VM.calc_curvature(math.radians(CS.steeringAngleDeg - params.angleOffsetDeg), CS.vEgo, params.roll)
actual_curvature_llk = llk.angularVelocityCalibrated.value[2] / CS.vEgo
actual_curvature = interp(CS.vEgo, [2.0, 5.0], [actual_curvature_vm, actual_curvature_llk])
curvature_deadzone = 0.0
desired_lateral_accel = desired_curvature * CS.vEgo ** 2
# desired rate is the desired rate of change in the setpoint, not the absolute desired curvature
# desired_lateral_jerk = desired_curvature_rate * CS.vEgo ** 2
actual_lateral_accel = actual_curvature * CS.vEgo ** 2
lateral_accel_deadzone = curvature_deadzone * CS.vEgo ** 2
low_speed_factor = interp(CS.vEgo, LOW_SPEED_X, LOW_SPEED_Y)**2
setpoint = desired_lateral_accel + low_speed_factor * desired_curvature
measurement = actual_lateral_accel + low_speed_factor * actual_curvature
gravity_adjusted_lateral_accel = desired_lateral_accel - params.roll * ACCELERATION_DUE_TO_GRAVITY
torque_from_setpoint = self.torque_from_lateral_accel(setpoint, self.torque_params, setpoint,
lateral_accel_deadzone, friction_compensation=False)
torque_from_measurement = self.torque_from_lateral_accel(measurement, self.torque_params, measurement,
lateral_accel_deadzone, friction_compensation=False)
pid_log.error = torque_from_setpoint - torque_from_measurement
ff = self.torque_from_lateral_accel(gravity_adjusted_lateral_accel, self.torque_params,
desired_lateral_accel - actual_lateral_accel,
lateral_accel_deadzone, friction_compensation=True)
freeze_integrator = steer_limited or CS.steeringPressed or CS.vEgo < 5
output_torque = self.pid.update(pid_log.error,
feedforward=ff,
speed=CS.vEgo,
freeze_integrator=freeze_integrator)
pid_log.active = True
pid_log.p = self.pid.p
pid_log.i = self.pid.i
pid_log.d = self.pid.d
pid_log.f = self.pid.f
pid_log.output = -output_torque
pid_log.actualLateralAccel = actual_lateral_accel
pid_log.desiredLateralAccel = desired_lateral_accel
pid_log.saturated = self._check_saturation(self.steer_max - abs(output_torque) < 1e-3, CS, steer_limited)
# TODO left is positive in this convention
return -output_torque, 0.0, pid_log

View File

@@ -0,0 +1,2 @@
acados_ocp_lat.json
c_generated_code/

View File

@@ -0,0 +1,89 @@
Import('env', 'envCython', 'arch')
gen = "c_generated_code"
casadi_model = [
f'{gen}/lat_model/lat_expl_ode_fun.c',
f'{gen}/lat_model/lat_expl_vde_forw.c',
]
casadi_cost_y = [
f'{gen}/lat_cost/lat_cost_y_fun.c',
f'{gen}/lat_cost/lat_cost_y_fun_jac_ut_xt.c',
f'{gen}/lat_cost/lat_cost_y_hess.c',
]
casadi_cost_e = [
f'{gen}/lat_cost/lat_cost_y_e_fun.c',
f'{gen}/lat_cost/lat_cost_y_e_fun_jac_ut_xt.c',
f'{gen}/lat_cost/lat_cost_y_e_hess.c',
]
casadi_cost_0 = [
f'{gen}/lat_cost/lat_cost_y_0_fun.c',
f'{gen}/lat_cost/lat_cost_y_0_fun_jac_ut_xt.c',
f'{gen}/lat_cost/lat_cost_y_0_hess.c',
]
build_files = [f'{gen}/acados_solver_lat.c'] + casadi_model + casadi_cost_y + casadi_cost_e + casadi_cost_0
# extra generated files used to trigger a rebuild
generated_files = [
f'{gen}/Makefile',
f'{gen}/main_lat.c',
f'{gen}/main_sim_lat.c',
f'{gen}/acados_solver_lat.h',
f'{gen}/acados_sim_solver_lat.h',
f'{gen}/acados_sim_solver_lat.c',
f'{gen}/acados_solver.pxd',
f'{gen}/lat_model/lat_expl_vde_adj.c',
f'{gen}/lat_model/lat_model.h',
f'{gen}/lat_constraints/lat_constraints.h',
f'{gen}/lat_cost/lat_cost.h',
] + build_files
acados_dir = '#third_party/acados'
acados_templates_dir = '#third_party/acados/acados_template/c_templates_tera'
source_list = ['lat_mpc.py',
'#selfdrive/modeld/constants.py',
f'{acados_dir}/include/acados_c/ocp_nlp_interface.h',
f'{acados_templates_dir}/acados_solver.in.c',
]
lenv = env.Clone()
lenv.Clean(generated_files, Dir(gen))
generated_lat = lenv.Command(generated_files,
source_list,
f"cd {Dir('.').abspath} && python3 lat_mpc.py")
lenv["CFLAGS"].append("-DACADOS_WITH_QPOASES")
lenv["CXXFLAGS"].append("-DACADOS_WITH_QPOASES")
lenv["CCFLAGS"].append("-Wno-unused")
if arch != "Darwin":
lenv["LINKFLAGS"].append("-Wl,--disable-new-dtags")
lib_solver = lenv.SharedLibrary(f"{gen}/acados_ocp_solver_lat",
build_files,
LIBS=['m', 'acados', 'hpipm', 'blasfeo', 'qpOASES_e'])
# generate cython stuff
acados_ocp_solver_pyx = File("#third_party/acados/acados_template/acados_ocp_solver_pyx.pyx")
acados_ocp_solver_common = File("#third_party/acados/acados_template/acados_solver_common.pxd")
libacados_ocp_solver_pxd = File(f'{gen}/acados_solver.pxd')
libacados_ocp_solver_c = File(f'{gen}/acados_ocp_solver_pyx.c')
lenv2 = envCython.Clone()
lenv2["LINKFLAGS"] += [lib_solver[0].get_labspath()]
lenv2.Command(libacados_ocp_solver_c,
[acados_ocp_solver_pyx, acados_ocp_solver_common, libacados_ocp_solver_pxd],
f'cython' + \
f' -o {libacados_ocp_solver_c.get_labspath()}' + \
f' -I {libacados_ocp_solver_pxd.get_dir().get_labspath()}' + \
f' -I {acados_ocp_solver_common.get_dir().get_labspath()}' + \
f' {acados_ocp_solver_pyx.get_labspath()}')
lib_cython = lenv2.Program(f'{gen}/acados_ocp_solver_pyx.so', [libacados_ocp_solver_c])
lenv2.Depends(lib_cython, lib_solver)

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
import os
import time
import numpy as np
from casadi import SX, vertcat, sin, cos
# WARNING: imports outside of constants will not trigger a rebuild
from openpilot.selfdrive.modeld.constants import ModelConstants
if __name__ == '__main__': # generating code
from openpilot.third_party.acados.acados_template import AcadosModel, AcadosOcp, AcadosOcpSolver
else:
from openpilot.selfdrive.controls.lib.lateral_mpc_lib.c_generated_code.acados_ocp_solver_pyx import AcadosOcpSolverCython
LAT_MPC_DIR = os.path.dirname(os.path.abspath(__file__))
EXPORT_DIR = os.path.join(LAT_MPC_DIR, "c_generated_code")
JSON_FILE = os.path.join(LAT_MPC_DIR, "acados_ocp_lat.json")
X_DIM = 4
P_DIM = 2
COST_E_DIM = 3
COST_DIM = COST_E_DIM + 2
SPEED_OFFSET = 10.0
MODEL_NAME = 'lat'
ACADOS_SOLVER_TYPE = 'SQP_RTI'
N = 32
def gen_lat_model():
model = AcadosModel()
model.name = MODEL_NAME
# set up states & controls
x_ego = SX.sym('x_ego')
y_ego = SX.sym('y_ego')
psi_ego = SX.sym('psi_ego')
psi_rate_ego = SX.sym('psi_rate_ego')
model.x = vertcat(x_ego, y_ego, psi_ego, psi_rate_ego)
# parameters
v_ego = SX.sym('v_ego')
rotation_radius = SX.sym('rotation_radius')
model.p = vertcat(v_ego, rotation_radius)
# controls
psi_accel_ego = SX.sym('psi_accel_ego')
model.u = vertcat(psi_accel_ego)
# xdot
x_ego_dot = SX.sym('x_ego_dot')
y_ego_dot = SX.sym('y_ego_dot')
psi_ego_dot = SX.sym('psi_ego_dot')
psi_rate_ego_dot = SX.sym('psi_rate_ego_dot')
model.xdot = vertcat(x_ego_dot, y_ego_dot, psi_ego_dot, psi_rate_ego_dot)
# dynamics model
f_expl = vertcat(v_ego * cos(psi_ego) - rotation_radius * sin(psi_ego) * psi_rate_ego,
v_ego * sin(psi_ego) + rotation_radius * cos(psi_ego) * psi_rate_ego,
psi_rate_ego,
psi_accel_ego)
model.f_impl_expr = model.xdot - f_expl
model.f_expl_expr = f_expl
return model
def gen_lat_ocp():
ocp = AcadosOcp()
ocp.model = gen_lat_model()
Tf = np.array(ModelConstants.T_IDXS)[N]
# set dimensions
ocp.dims.N = N
# set cost module
ocp.cost.cost_type = 'NONLINEAR_LS'
ocp.cost.cost_type_e = 'NONLINEAR_LS'
Q = np.diag(np.zeros(COST_E_DIM))
QR = np.diag(np.zeros(COST_DIM))
ocp.cost.W = QR
ocp.cost.W_e = Q
y_ego, psi_ego, psi_rate_ego = ocp.model.x[1], ocp.model.x[2], ocp.model.x[3]
psi_rate_ego_dot = ocp.model.u[0]
v_ego = ocp.model.p[0]
ocp.parameter_values = np.zeros((P_DIM, ))
ocp.cost.yref = np.zeros((COST_DIM, ))
ocp.cost.yref_e = np.zeros((COST_E_DIM, ))
# Add offset to smooth out low speed control
# TODO unclear if this right solution long term
v_ego_offset = v_ego + SPEED_OFFSET
# TODO there are two costs on psi_rate_ego_dot, one
# is correlated to jerk the other to steering wheel movement
# the steering wheel movement cost is added to prevent excessive
# wheel movements
ocp.model.cost_y_expr = vertcat(y_ego,
v_ego_offset * psi_ego,
v_ego_offset * psi_rate_ego,
v_ego_offset * psi_rate_ego_dot,
psi_rate_ego_dot / (v_ego + 0.1))
ocp.model.cost_y_expr_e = vertcat(y_ego,
v_ego_offset * psi_ego,
v_ego_offset * psi_rate_ego)
# set constraints
ocp.constraints.constr_type = 'BGH'
ocp.constraints.idxbx = np.array([2,3])
ocp.constraints.ubx = np.array([np.radians(90), np.radians(50)])
ocp.constraints.lbx = np.array([-np.radians(90), -np.radians(50)])
x0 = np.zeros((X_DIM,))
ocp.constraints.x0 = x0
ocp.solver_options.qp_solver = 'PARTIAL_CONDENSING_HPIPM'
ocp.solver_options.hessian_approx = 'GAUSS_NEWTON'
ocp.solver_options.integrator_type = 'ERK'
ocp.solver_options.nlp_solver_type = ACADOS_SOLVER_TYPE
ocp.solver_options.qp_solver_iter_max = 1
ocp.solver_options.qp_solver_cond_N = 1
# set prediction horizon
ocp.solver_options.tf = Tf
ocp.solver_options.shooting_nodes = np.array(ModelConstants.T_IDXS)[:N+1]
ocp.code_export_directory = EXPORT_DIR
return ocp
class LateralMpc():
def __init__(self, x0=None):
if x0 is None:
x0 = np.zeros(X_DIM)
self.solver = AcadosOcpSolverCython(MODEL_NAME, ACADOS_SOLVER_TYPE, N)
self.reset(x0)
def reset(self, x0=None):
if x0 is None:
x0 = np.zeros(X_DIM)
self.x_sol = np.zeros((N+1, X_DIM))
self.u_sol = np.zeros((N, 1))
self.yref = np.zeros((N+1, COST_DIM))
for i in range(N):
self.solver.cost_set(i, "yref", self.yref[i])
self.solver.cost_set(N, "yref", self.yref[N][:COST_E_DIM])
# Somehow needed for stable init
for i in range(N+1):
self.solver.set(i, 'x', np.zeros(X_DIM))
self.solver.set(i, 'p', np.zeros(P_DIM))
self.solver.constraints_set(0, "lbx", x0)
self.solver.constraints_set(0, "ubx", x0)
self.solver.solve()
self.solution_status = 0
self.solve_time = 0.0
self.cost = 0
def set_weights(self, path_weight, heading_weight,
lat_accel_weight, lat_jerk_weight,
steering_rate_weight):
W = np.asfortranarray(np.diag([path_weight, heading_weight,
lat_accel_weight, lat_jerk_weight,
steering_rate_weight]))
for i in range(N):
self.solver.cost_set(i, 'W', W)
self.solver.cost_set(N, 'W', W[:COST_E_DIM,:COST_E_DIM])
def run(self, x0, p, y_pts, heading_pts, yaw_rate_pts):
x0_cp = np.copy(x0)
p_cp = np.copy(p)
self.solver.constraints_set(0, "lbx", x0_cp)
self.solver.constraints_set(0, "ubx", x0_cp)
self.yref[:,0] = y_pts
v_ego = p_cp[0, 0]
# rotation_radius = p_cp[1]
self.yref[:,1] = heading_pts * (v_ego + SPEED_OFFSET)
self.yref[:,2] = yaw_rate_pts * (v_ego + SPEED_OFFSET)
for i in range(N):
self.solver.cost_set(i, "yref", self.yref[i])
self.solver.set(i, "p", p_cp[i])
self.solver.set(N, "p", p_cp[N])
self.solver.cost_set(N, "yref", self.yref[N][:COST_E_DIM])
t = time.monotonic()
self.solution_status = self.solver.solve()
self.solve_time = time.monotonic() - t
for i in range(N+1):
self.x_sol[i] = self.solver.get(i, 'x')
for i in range(N):
self.u_sol[i] = self.solver.get(i, 'u')
self.cost = self.solver.get_cost()
if __name__ == "__main__":
ocp = gen_lat_ocp()
AcadosOcpSolver.generate(ocp, json_file=JSON_FILE)
# AcadosOcpSolver.build(ocp.code_export_directory, with_cython=True)

View File

@@ -0,0 +1,74 @@
import numpy as np
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, MIN_SPEED, get_speed_error
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
import cereal.messaging as messaging
from cereal import log
TRAJECTORY_SIZE = 33
CAMERA_OFFSET = 0.04
class LateralPlanner:
def __init__(self, CP, debug=False):
self.DH = DesireHelper()
# Vehicle model parameters used to calculate lateral movement of car
self.factor1 = CP.wheelbase - CP.centerToFront
self.factor2 = (CP.centerToFront * CP.mass) / (CP.wheelbase * CP.tireStiffnessRear)
self.last_cloudlog_t = 0
self.solution_invalid_cnt = 0
self.path_xyz = np.zeros((TRAJECTORY_SIZE, 3))
self.velocity_xyz = np.zeros((TRAJECTORY_SIZE, 3))
self.v_plan = np.zeros((TRAJECTORY_SIZE,))
self.x_sol = np.zeros((TRAJECTORY_SIZE, 4), dtype=np.float32)
self.v_ego = MIN_SPEED
self.l_lane_change_prob = 0.0
self.r_lane_change_prob = 0.0
self.debug_mode = debug
def update(self, sm):
v_ego_car = sm['carState'].vEgo
# Parse model predictions
md = sm['modelV2']
if len(md.position.x) == TRAJECTORY_SIZE and len(md.velocity.x) == TRAJECTORY_SIZE and len(md.lateralPlannerSolution.x) == TRAJECTORY_SIZE:
self.path_xyz = np.column_stack([md.position.x, md.position.y, md.position.z])
self.velocity_xyz = np.column_stack([md.velocity.x, md.velocity.y, md.velocity.z])
car_speed = np.linalg.norm(self.velocity_xyz, axis=1) - get_speed_error(md, v_ego_car)
self.v_plan = np.clip(car_speed, MIN_SPEED, np.inf)
self.v_ego = self.v_plan[0]
self.x_sol = np.column_stack([md.lateralPlannerSolution.x, md.lateralPlannerSolution.y, md.lateralPlannerSolution.yaw, md.lateralPlannerSolution.yawRate])
# Lane change logic
desire_state = md.meta.desireState
if len(desire_state):
self.l_lane_change_prob = desire_state[log.LateralPlan.Desire.laneChangeLeft]
self.r_lane_change_prob = desire_state[log.LateralPlan.Desire.laneChangeRight]
lane_change_prob = self.l_lane_change_prob + self.r_lane_change_prob
self.DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
def publish(self, sm, pm):
plan_send = messaging.new_message('lateralPlan')
plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState', 'modelV2'])
lateralPlan = plan_send.lateralPlan
lateralPlan.modelMonoTime = sm.logMonoTime['modelV2']
lateralPlan.dPathPoints = self.path_xyz[:,1].tolist()
lateralPlan.psis = self.x_sol[0:CONTROL_N, 2].tolist()
lateralPlan.curvatures = (self.x_sol[0:CONTROL_N, 3]/self.v_ego).tolist()
lateralPlan.curvatureRates = [float(0) for _ in range(CONTROL_N-1)] # TODO: unused
lateralPlan.mpcSolutionValid = bool(1)
lateralPlan.solverExecutionTime = 0.0
if self.debug_mode:
lateralPlan.solverState = log.LateralPlan.SolverState.new_message()
lateralPlan.solverState.x = self.x_sol.tolist()
lateralPlan.desire = self.DH.desire
lateralPlan.useLaneLines = False
lateralPlan.laneChangeState = self.DH.lane_change_state
lateralPlan.laneChangeDirection = self.DH.lane_change_direction
pm.send('lateralPlan', plan_send)

View File

@@ -0,0 +1,132 @@
from cereal import car
from openpilot.common.numpy_fast import clip, interp
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, apply_deadzone
from openpilot.selfdrive.controls.lib.pid import PIDController
from openpilot.selfdrive.modeld.constants import ModelConstants
LongCtrlState = car.CarControl.Actuators.LongControlState
def long_control_state_trans(CP, active, long_control_state, v_ego, v_target,
v_target_1sec, brake_pressed, cruise_standstill):
# Ignore cruise standstill if car has a gas interceptor
cruise_standstill = cruise_standstill and not CP.enableGasInterceptor
accelerating = v_target_1sec > v_target
planned_stop = (v_target < CP.vEgoStopping and
v_target_1sec < CP.vEgoStopping and
not accelerating)
stay_stopped = (v_ego < CP.vEgoStopping and
(brake_pressed or cruise_standstill))
stopping_condition = planned_stop or stay_stopped
starting_condition = (v_target_1sec > CP.vEgoStarting and
accelerating and
not cruise_standstill and
not brake_pressed)
started_condition = v_ego > CP.vEgoStarting
if not active:
long_control_state = LongCtrlState.off
else:
if long_control_state in (LongCtrlState.off, LongCtrlState.pid):
long_control_state = LongCtrlState.pid
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif long_control_state == LongCtrlState.stopping:
if starting_condition and CP.startingState:
long_control_state = LongCtrlState.starting
elif starting_condition:
long_control_state = LongCtrlState.pid
elif long_control_state == LongCtrlState.starting:
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif started_condition:
long_control_state = LongCtrlState.pid
return long_control_state
class LongControl:
def __init__(self, CP):
self.CP = CP
self.long_control_state = LongCtrlState.off # initialized to off
self.pid = PIDController((CP.longitudinalTuning.kpBP, CP.longitudinalTuning.kpV),
(CP.longitudinalTuning.kiBP, CP.longitudinalTuning.kiV),
k_f=CP.longitudinalTuning.kf, rate=1 / DT_CTRL)
self.v_pid = 0.0
self.last_output_accel = 0.0
def reset(self, v_pid):
"""Reset PID controller and change setpoint"""
self.pid.reset()
self.v_pid = v_pid
def update(self, active, CS, long_plan, accel_limits, t_since_plan):
"""Update longitudinal control. This updates the state machine and runs a PID loop"""
# Interp control trajectory
speeds = long_plan.speeds
if len(speeds) == CONTROL_N:
v_target_now = interp(t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], speeds)
a_target_now = interp(t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], long_plan.accels)
v_target_lower = interp(self.CP.longitudinalActuatorDelayLowerBound + t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], speeds)
a_target_lower = 2 * (v_target_lower - v_target_now) / self.CP.longitudinalActuatorDelayLowerBound - a_target_now
v_target_upper = interp(self.CP.longitudinalActuatorDelayUpperBound + t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], speeds)
a_target_upper = 2 * (v_target_upper - v_target_now) / self.CP.longitudinalActuatorDelayUpperBound - a_target_now
v_target = min(v_target_lower, v_target_upper)
a_target = min(a_target_lower, a_target_upper)
v_target_1sec = interp(self.CP.longitudinalActuatorDelayUpperBound + t_since_plan + 1.0, ModelConstants.T_IDXS[:CONTROL_N], speeds)
else:
v_target = 0.0
v_target_now = 0.0
v_target_1sec = 0.0
a_target = 0.0
self.pid.neg_limit = accel_limits[0]
self.pid.pos_limit = accel_limits[1]
output_accel = self.last_output_accel
self.long_control_state = long_control_state_trans(self.CP, active, self.long_control_state, CS.vEgo,
v_target, v_target_1sec, CS.brakePressed,
CS.cruiseState.standstill)
if self.long_control_state == LongCtrlState.off:
self.reset(CS.vEgo)
output_accel = 0.
elif self.long_control_state == LongCtrlState.stopping:
if output_accel > self.CP.stopAccel:
output_accel = min(output_accel, 0.0)
output_accel -= self.CP.stoppingDecelRate * DT_CTRL
self.reset(CS.vEgo)
elif self.long_control_state == LongCtrlState.starting:
output_accel = self.CP.startAccel
self.reset(CS.vEgo)
elif self.long_control_state == LongCtrlState.pid:
self.v_pid = v_target_now
# Toyota starts braking more when it thinks you want to stop
# Freeze the integrator so we don't accelerate to compensate, and don't allow positive acceleration
# TODO too complex, needs to be simplified and tested on toyotas
prevent_overshoot = not self.CP.stoppingControl and CS.vEgo < 1.5 and v_target_1sec < 0.7 and v_target_1sec < self.v_pid
deadzone = interp(CS.vEgo, self.CP.longitudinalTuning.deadzoneBP, self.CP.longitudinalTuning.deadzoneV)
freeze_integrator = prevent_overshoot
error = self.v_pid - CS.vEgo
error_deadzone = apply_deadzone(error, deadzone)
output_accel = self.pid.update(error_deadzone, speed=CS.vEgo,
feedforward=a_target,
freeze_integrator=freeze_integrator)
self.last_output_accel = clip(output_accel, accel_limits[0], accel_limits[1])
return self.last_output_accel

View File

@@ -0,0 +1,2 @@
acados_ocp_long.json
c_generated_code/

View File

@@ -0,0 +1,96 @@
Import('env', 'envCython', 'arch', 'messaging_python', 'common_python')
gen = "c_generated_code"
casadi_model = [
f'{gen}/long_model/long_expl_ode_fun.c',
f'{gen}/long_model/long_expl_vde_forw.c',
]
casadi_cost_y = [
f'{gen}/long_cost/long_cost_y_fun.c',
f'{gen}/long_cost/long_cost_y_fun_jac_ut_xt.c',
f'{gen}/long_cost/long_cost_y_hess.c',
]
casadi_cost_e = [
f'{gen}/long_cost/long_cost_y_e_fun.c',
f'{gen}/long_cost/long_cost_y_e_fun_jac_ut_xt.c',
f'{gen}/long_cost/long_cost_y_e_hess.c',
]
casadi_cost_0 = [
f'{gen}/long_cost/long_cost_y_0_fun.c',
f'{gen}/long_cost/long_cost_y_0_fun_jac_ut_xt.c',
f'{gen}/long_cost/long_cost_y_0_hess.c',
]
casadi_constraints = [
f'{gen}/long_constraints/long_constr_h_fun.c',
f'{gen}/long_constraints/long_constr_h_fun_jac_uxt_zt.c',
]
build_files = [f'{gen}/acados_solver_long.c'] + casadi_model + casadi_cost_y + casadi_cost_e + \
casadi_cost_0 + casadi_constraints
# extra generated files used to trigger a rebuild
generated_files = [
f'{gen}/Makefile',
f'{gen}/main_long.c',
f'{gen}/main_sim_long.c',
f'{gen}/acados_solver_long.h',
f'{gen}/acados_sim_solver_long.h',
f'{gen}/acados_sim_solver_long.c',
f'{gen}/acados_solver.pxd',
f'{gen}/long_model/long_expl_vde_adj.c',
f'{gen}/long_model/long_model.h',
f'{gen}/long_constraints/long_constraints.h',
f'{gen}/long_cost/long_cost.h',
] + build_files
acados_dir = '#third_party/acados'
acados_templates_dir = '#third_party/acados/acados_template/c_templates_tera'
source_list = ['long_mpc.py',
'#selfdrive/modeld/constants.py',
f'{acados_dir}/include/acados_c/ocp_nlp_interface.h',
f'{acados_templates_dir}/acados_solver.in.c',
]
lenv = env.Clone()
lenv.Clean(generated_files, Dir(gen))
generated_long = lenv.Command(generated_files,
source_list,
f"cd {Dir('.').abspath} && python3 long_mpc.py")
lenv.Depends(generated_long, [messaging_python, common_python])
lenv["CFLAGS"].append("-DACADOS_WITH_QPOASES")
lenv["CXXFLAGS"].append("-DACADOS_WITH_QPOASES")
lenv["CCFLAGS"].append("-Wno-unused")
if arch != "Darwin":
lenv["LINKFLAGS"].append("-Wl,--disable-new-dtags")
lib_solver = lenv.SharedLibrary(f"{gen}/acados_ocp_solver_long",
build_files,
LIBS=['m', 'acados', 'hpipm', 'blasfeo', 'qpOASES_e'])
# generate cython stuff
acados_ocp_solver_pyx = File("#third_party/acados/acados_template/acados_ocp_solver_pyx.pyx")
acados_ocp_solver_common = File("#third_party/acados/acados_template/acados_solver_common.pxd")
libacados_ocp_solver_pxd = File(f'{gen}/acados_solver.pxd')
libacados_ocp_solver_c = File(f'{gen}/acados_ocp_solver_pyx.c')
lenv2 = envCython.Clone()
lenv2["LINKFLAGS"] += [lib_solver[0].get_labspath()]
lenv2.Command(libacados_ocp_solver_c,
[acados_ocp_solver_pyx, acados_ocp_solver_common, libacados_ocp_solver_pxd],
f'cython' + \
f' -o {libacados_ocp_solver_c.get_labspath()}' + \
f' -I {libacados_ocp_solver_pxd.get_dir().get_labspath()}' + \
f' -I {acados_ocp_solver_common.get_dir().get_labspath()}' + \
f' {acados_ocp_solver_pyx.get_labspath()}')
lib_cython = lenv2.Program(f'{gen}/acados_ocp_solver_pyx.so', [libacados_ocp_solver_c])
lenv2.Depends(lib_cython, lib_solver)

View File

@@ -0,0 +1,459 @@
#!/usr/bin/env python3
import os
import time
import numpy as np
from cereal import log
from openpilot.common.numpy_fast import clip
from openpilot.common.swaglog import cloudlog
# WARNING: imports outside of constants will not trigger a rebuild
from openpilot.selfdrive.modeld.constants import index_function
from openpilot.selfdrive.car.interfaces import ACCEL_MIN
from openpilot.selfdrive.controls.radard import _LEAD_ACCEL_TAU
if __name__ == '__main__': # generating code
from openpilot.third_party.acados.acados_template import AcadosModel, AcadosOcp, AcadosOcpSolver
else:
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.c_generated_code.acados_ocp_solver_pyx import AcadosOcpSolverCython
from casadi import SX, vertcat
MODEL_NAME = 'long'
LONG_MPC_DIR = os.path.dirname(os.path.abspath(__file__))
EXPORT_DIR = os.path.join(LONG_MPC_DIR, "c_generated_code")
JSON_FILE = os.path.join(LONG_MPC_DIR, "acados_ocp_long.json")
SOURCES = ['lead0', 'lead1', 'cruise', 'e2e']
X_DIM = 3
U_DIM = 1
PARAM_DIM = 6
COST_E_DIM = 5
COST_DIM = COST_E_DIM + 1
CONSTR_DIM = 4
X_EGO_OBSTACLE_COST = 3.
X_EGO_COST = 0.
V_EGO_COST = 0.
A_EGO_COST = 0.
J_EGO_COST = 5.0
A_CHANGE_COST = 200.
DANGER_ZONE_COST = 100.
CRASH_DISTANCE = .25
LEAD_DANGER_FACTOR = 0.75
LIMIT_COST = 1e6
ACADOS_SOLVER_TYPE = 'SQP_RTI'
# Fewer timestamps don't hurt performance and lead to
# much better convergence of the MPC with low iterations
N = 12
MAX_T = 10.0
T_IDXS_LST = [index_function(idx, max_val=MAX_T, max_idx=N) for idx in range(N+1)]
T_IDXS = np.array(T_IDXS_LST)
FCW_IDXS = T_IDXS < 5.0
T_DIFFS = np.diff(T_IDXS, prepend=[0.])
COMFORT_BRAKE = 2.5
STOP_DISTANCE = 6.0
def get_jerk_factor(personality=log.LongitudinalPersonality.standard):
if personality==log.LongitudinalPersonality.relaxed:
return 1.0
elif personality==log.LongitudinalPersonality.standard:
return 1.0
elif personality==log.LongitudinalPersonality.aggressive:
return 0.5
else:
raise NotImplementedError("Longitudinal personality not supported")
def get_T_FOLLOW(personality=log.LongitudinalPersonality.standard):
if personality==log.LongitudinalPersonality.relaxed:
return 1.75
elif personality==log.LongitudinalPersonality.standard:
return 1.45
elif personality==log.LongitudinalPersonality.aggressive:
return 1.25
else:
raise NotImplementedError("Longitudinal personality not supported")
def get_stopped_equivalence_factor(v_lead):
return (v_lead**2) / (2 * COMFORT_BRAKE)
def get_safe_obstacle_distance(v_ego, t_follow):
return (v_ego**2) / (2 * COMFORT_BRAKE) + t_follow * v_ego + STOP_DISTANCE
def desired_follow_distance(v_ego, v_lead, t_follow=None):
if t_follow is None:
t_follow = get_T_FOLLOW()
return get_safe_obstacle_distance(v_ego, t_follow) - get_stopped_equivalence_factor(v_lead)
def gen_long_model():
model = AcadosModel()
model.name = MODEL_NAME
# set up states & controls
x_ego = SX.sym('x_ego')
v_ego = SX.sym('v_ego')
a_ego = SX.sym('a_ego')
model.x = vertcat(x_ego, v_ego, a_ego)
# controls
j_ego = SX.sym('j_ego')
model.u = vertcat(j_ego)
# xdot
x_ego_dot = SX.sym('x_ego_dot')
v_ego_dot = SX.sym('v_ego_dot')
a_ego_dot = SX.sym('a_ego_dot')
model.xdot = vertcat(x_ego_dot, v_ego_dot, a_ego_dot)
# live parameters
a_min = SX.sym('a_min')
a_max = SX.sym('a_max')
x_obstacle = SX.sym('x_obstacle')
prev_a = SX.sym('prev_a')
lead_t_follow = SX.sym('lead_t_follow')
lead_danger_factor = SX.sym('lead_danger_factor')
model.p = vertcat(a_min, a_max, x_obstacle, prev_a, lead_t_follow, lead_danger_factor)
# dynamics model
f_expl = vertcat(v_ego, a_ego, j_ego)
model.f_impl_expr = model.xdot - f_expl
model.f_expl_expr = f_expl
return model
def gen_long_ocp():
ocp = AcadosOcp()
ocp.model = gen_long_model()
Tf = T_IDXS[-1]
# set dimensions
ocp.dims.N = N
# set cost module
ocp.cost.cost_type = 'NONLINEAR_LS'
ocp.cost.cost_type_e = 'NONLINEAR_LS'
QR = np.zeros((COST_DIM, COST_DIM))
Q = np.zeros((COST_E_DIM, COST_E_DIM))
ocp.cost.W = QR
ocp.cost.W_e = Q
x_ego, v_ego, a_ego = ocp.model.x[0], ocp.model.x[1], ocp.model.x[2]
j_ego = ocp.model.u[0]
a_min, a_max = ocp.model.p[0], ocp.model.p[1]
x_obstacle = ocp.model.p[2]
prev_a = ocp.model.p[3]
lead_t_follow = ocp.model.p[4]
lead_danger_factor = ocp.model.p[5]
ocp.cost.yref = np.zeros((COST_DIM, ))
ocp.cost.yref_e = np.zeros((COST_E_DIM, ))
desired_dist_comfort = get_safe_obstacle_distance(v_ego, lead_t_follow)
# The main cost in normal operation is how close you are to the "desired" distance
# from an obstacle at every timestep. This obstacle can be a lead car
# or other object. In e2e mode we can use x_position targets as a cost
# instead.
costs = [((x_obstacle - x_ego) - (desired_dist_comfort)) / (v_ego + 10.),
x_ego,
v_ego,
a_ego,
a_ego - prev_a,
j_ego]
ocp.model.cost_y_expr = vertcat(*costs)
ocp.model.cost_y_expr_e = vertcat(*costs[:-1])
# Constraints on speed, acceleration and desired distance to
# the obstacle, which is treated as a slack constraint so it
# behaves like an asymmetrical cost.
constraints = vertcat(v_ego,
(a_ego - a_min),
(a_max - a_ego),
((x_obstacle - x_ego) - lead_danger_factor * (desired_dist_comfort)) / (v_ego + 10.))
ocp.model.con_h_expr = constraints
x0 = np.zeros(X_DIM)
ocp.constraints.x0 = x0
ocp.parameter_values = np.array([-1.2, 1.2, 0.0, 0.0, get_T_FOLLOW(), LEAD_DANGER_FACTOR])
# We put all constraint cost weights to 0 and only set them at runtime
cost_weights = np.zeros(CONSTR_DIM)
ocp.cost.zl = cost_weights
ocp.cost.Zl = cost_weights
ocp.cost.Zu = cost_weights
ocp.cost.zu = cost_weights
ocp.constraints.lh = np.zeros(CONSTR_DIM)
ocp.constraints.uh = 1e4*np.ones(CONSTR_DIM)
ocp.constraints.idxsh = np.arange(CONSTR_DIM)
# The HPIPM solver can give decent solutions even when it is stopped early
# Which is critical for our purpose where compute time is strictly bounded
# We use HPIPM in the SPEED_ABS mode, which ensures fastest runtime. This
# does not cause issues since the problem is well bounded.
ocp.solver_options.qp_solver = 'PARTIAL_CONDENSING_HPIPM'
ocp.solver_options.hessian_approx = 'GAUSS_NEWTON'
ocp.solver_options.integrator_type = 'ERK'
ocp.solver_options.nlp_solver_type = ACADOS_SOLVER_TYPE
ocp.solver_options.qp_solver_cond_N = 1
# More iterations take too much time and less lead to inaccurate convergence in
# some situations. Ideally we would run just 1 iteration to ensure fixed runtime.
ocp.solver_options.qp_solver_iter_max = 10
ocp.solver_options.qp_tol = 1e-3
# set prediction horizon
ocp.solver_options.tf = Tf
ocp.solver_options.shooting_nodes = T_IDXS
ocp.code_export_directory = EXPORT_DIR
return ocp
class LongitudinalMpc:
def __init__(self, mode='acc'):
self.mode = mode
self.solver = AcadosOcpSolverCython(MODEL_NAME, ACADOS_SOLVER_TYPE, N)
self.reset()
self.source = SOURCES[2]
def reset(self):
# self.solver = AcadosOcpSolverCython(MODEL_NAME, ACADOS_SOLVER_TYPE, N)
self.solver.reset()
# self.solver.options_set('print_level', 2)
self.v_solution = np.zeros(N+1)
self.a_solution = np.zeros(N+1)
self.prev_a = np.array(self.a_solution)
self.j_solution = np.zeros(N)
self.yref = np.zeros((N+1, COST_DIM))
for i in range(N):
self.solver.cost_set(i, "yref", self.yref[i])
self.solver.cost_set(N, "yref", self.yref[N][:COST_E_DIM])
self.x_sol = np.zeros((N+1, X_DIM))
self.u_sol = np.zeros((N,1))
self.params = np.zeros((N+1, PARAM_DIM))
for i in range(N+1):
self.solver.set(i, 'x', np.zeros(X_DIM))
self.last_cloudlog_t = 0
self.status = False
self.crash_cnt = 0.0
self.solution_status = 0
# timers
self.solve_time = 0.0
self.time_qp_solution = 0.0
self.time_linearization = 0.0
self.time_integrator = 0.0
self.x0 = np.zeros(X_DIM)
self.set_weights()
def set_cost_weights(self, cost_weights, constraint_cost_weights):
W = np.asfortranarray(np.diag(cost_weights))
for i in range(N):
# TODO don't hardcode A_CHANGE_COST idx
# reduce the cost on (a-a_prev) later in the horizon.
W[4,4] = cost_weights[4] * np.interp(T_IDXS[i], [0.0, 1.0, 2.0], [1.0, 1.0, 0.0])
self.solver.cost_set(i, 'W', W)
# Setting the slice without the copy make the array not contiguous,
# causing issues with the C interface.
self.solver.cost_set(N, 'W', np.copy(W[:COST_E_DIM, :COST_E_DIM]))
# Set L2 slack cost on lower bound constraints
Zl = np.array(constraint_cost_weights)
for i in range(N):
self.solver.cost_set(i, 'Zl', Zl)
def set_weights(self, prev_accel_constraint=True, personality=log.LongitudinalPersonality.standard):
jerk_factor = get_jerk_factor(personality)
if self.mode == 'acc':
a_change_cost = A_CHANGE_COST if prev_accel_constraint else 0
cost_weights = [X_EGO_OBSTACLE_COST, X_EGO_COST, V_EGO_COST, A_EGO_COST, jerk_factor * a_change_cost, jerk_factor * J_EGO_COST]
constraint_cost_weights = [LIMIT_COST, LIMIT_COST, LIMIT_COST, DANGER_ZONE_COST]
elif self.mode == 'blended':
a_change_cost = 40.0 if prev_accel_constraint else 0
cost_weights = [0., 0.1, 0.2, 5.0, a_change_cost, 1.0]
constraint_cost_weights = [LIMIT_COST, LIMIT_COST, LIMIT_COST, 50.0]
else:
raise NotImplementedError(f'Planner mode {self.mode} not recognized in planner cost set')
self.set_cost_weights(cost_weights, constraint_cost_weights)
def set_cur_state(self, v, a):
v_prev = self.x0[1]
self.x0[1] = v
self.x0[2] = a
if abs(v_prev - v) > 2.: # probably only helps if v < v_prev
for i in range(N+1):
self.solver.set(i, 'x', self.x0)
@staticmethod
def extrapolate_lead(x_lead, v_lead, a_lead, a_lead_tau):
a_lead_traj = a_lead * np.exp(-a_lead_tau * (T_IDXS**2)/2.)
v_lead_traj = np.clip(v_lead + np.cumsum(T_DIFFS * a_lead_traj), 0.0, 1e8)
x_lead_traj = x_lead + np.cumsum(T_DIFFS * v_lead_traj)
lead_xv = np.column_stack((x_lead_traj, v_lead_traj))
return lead_xv
def process_lead(self, lead):
v_ego = self.x0[1]
if lead is not None and lead.status:
x_lead = lead.dRel
v_lead = lead.vLead
a_lead = lead.aLeadK
a_lead_tau = lead.aLeadTau
else:
# Fake a fast lead car, so mpc can keep running in the same mode
x_lead = 50.0
v_lead = v_ego + 10.0
a_lead = 0.0
a_lead_tau = _LEAD_ACCEL_TAU
# MPC will not converge if immediate crash is expected
# Clip lead distance to what is still possible to brake for
min_x_lead = ((v_ego + v_lead)/2) * (v_ego - v_lead) / (-ACCEL_MIN * 2)
x_lead = clip(x_lead, min_x_lead, 1e8)
v_lead = clip(v_lead, 0.0, 1e8)
a_lead = clip(a_lead, -10., 5.)
lead_xv = self.extrapolate_lead(x_lead, v_lead, a_lead, a_lead_tau)
return lead_xv
def set_accel_limits(self, min_a, max_a):
# TODO this sets a max accel limit, but the minimum limit is only for cruise decel
# needs refactor
self.cruise_min_a = min_a
self.max_a = max_a
def update(self, radarstate, v_cruise, x, v, a, j, personality=log.LongitudinalPersonality.standard):
t_follow = get_T_FOLLOW(personality)
v_ego = self.x0[1]
self.status = radarstate.leadOne.status or radarstate.leadTwo.status
lead_xv_0 = self.process_lead(radarstate.leadOne)
lead_xv_1 = self.process_lead(radarstate.leadTwo)
# To estimate a safe distance from a moving lead, we calculate how much stopping
# distance that lead needs as a minimum. We can add that to the current distance
# and then treat that as a stopped car/obstacle at this new distance.
lead_0_obstacle = lead_xv_0[:,0] + get_stopped_equivalence_factor(lead_xv_0[:,1])
lead_1_obstacle = lead_xv_1[:,0] + get_stopped_equivalence_factor(lead_xv_1[:,1])
self.params[:,0] = ACCEL_MIN
self.params[:,1] = self.max_a
# Update in ACC mode or ACC/e2e blend
if self.mode == 'acc':
self.params[:,5] = LEAD_DANGER_FACTOR
# Fake an obstacle for cruise, this ensures smooth acceleration to set speed
# when the leads are no factor.
v_lower = v_ego + (T_IDXS * self.cruise_min_a * 1.05)
v_upper = v_ego + (T_IDXS * self.max_a * 1.05)
v_cruise_clipped = np.clip(v_cruise * np.ones(N+1),
v_lower,
v_upper)
cruise_obstacle = np.cumsum(T_DIFFS * v_cruise_clipped) + get_safe_obstacle_distance(v_cruise_clipped, t_follow)
x_obstacles = np.column_stack([lead_0_obstacle, lead_1_obstacle, cruise_obstacle])
self.source = SOURCES[np.argmin(x_obstacles[0])]
# These are not used in ACC mode
x[:], v[:], a[:], j[:] = 0.0, 0.0, 0.0, 0.0
elif self.mode == 'blended':
self.params[:,5] = 1.0
x_obstacles = np.column_stack([lead_0_obstacle,
lead_1_obstacle])
cruise_target = T_IDXS * np.clip(v_cruise, v_ego - 2.0, 1e3) + x[0]
xforward = ((v[1:] + v[:-1]) / 2) * (T_IDXS[1:] - T_IDXS[:-1])
x = np.cumsum(np.insert(xforward, 0, x[0]))
x_and_cruise = np.column_stack([x, cruise_target])
x = np.min(x_and_cruise, axis=1)
self.source = 'e2e' if x_and_cruise[1,0] < x_and_cruise[1,1] else 'cruise'
else:
raise NotImplementedError(f'Planner mode {self.mode} not recognized in planner update')
self.yref[:,1] = x
self.yref[:,2] = v
self.yref[:,3] = a
self.yref[:,5] = j
for i in range(N):
self.solver.set(i, "yref", self.yref[i])
self.solver.set(N, "yref", self.yref[N][:COST_E_DIM])
self.params[:,2] = np.min(x_obstacles, axis=1)
self.params[:,3] = np.copy(self.prev_a)
self.params[:,4] = t_follow
self.run()
if (np.any(lead_xv_0[FCW_IDXS,0] - self.x_sol[FCW_IDXS,0] < CRASH_DISTANCE) and
radarstate.leadOne.modelProb > 0.9):
self.crash_cnt += 1
else:
self.crash_cnt = 0
# Check if it got within lead comfort range
# TODO This should be done cleaner
if self.mode == 'blended':
if any((lead_0_obstacle - get_safe_obstacle_distance(self.x_sol[:,1], t_follow))- self.x_sol[:,0] < 0.0):
self.source = 'lead0'
if any((lead_1_obstacle - get_safe_obstacle_distance(self.x_sol[:,1], t_follow))- self.x_sol[:,0] < 0.0) and \
(lead_1_obstacle[0] - lead_0_obstacle[0]):
self.source = 'lead1'
def run(self):
# t0 = time.monotonic()
# reset = 0
for i in range(N+1):
self.solver.set(i, 'p', self.params[i])
self.solver.constraints_set(0, "lbx", self.x0)
self.solver.constraints_set(0, "ubx", self.x0)
self.solution_status = self.solver.solve()
self.solve_time = float(self.solver.get_stats('time_tot')[0])
self.time_qp_solution = float(self.solver.get_stats('time_qp')[0])
self.time_linearization = float(self.solver.get_stats('time_lin')[0])
self.time_integrator = float(self.solver.get_stats('time_sim')[0])
# qp_iter = self.solver.get_stats('statistics')[-1][-1] # SQP_RTI specific
# print(f"long_mpc timings: tot {self.solve_time:.2e}, qp {self.time_qp_solution:.2e}, lin {self.time_linearization:.2e}, \
# integrator {self.time_integrator:.2e}, qp_iter {qp_iter}")
# res = self.solver.get_residuals()
# print(f"long_mpc residuals: {res[0]:.2e}, {res[1]:.2e}, {res[2]:.2e}, {res[3]:.2e}")
# self.solver.print_statistics()
for i in range(N+1):
self.x_sol[i] = self.solver.get(i, 'x')
for i in range(N):
self.u_sol[i] = self.solver.get(i, 'u')
self.v_solution = self.x_sol[:,1]
self.a_solution = self.x_sol[:,2]
self.j_solution = self.u_sol[:,0]
self.prev_a = np.interp(T_IDXS + 0.05, T_IDXS, self.a_solution)
t = time.monotonic()
if self.solution_status != 0:
if t > self.last_cloudlog_t + 5.0:
self.last_cloudlog_t = t
cloudlog.warning(f"Long mpc reset, solution_status: {self.solution_status}")
self.reset()
# reset = 1
# print(f"long_mpc timings: total internal {self.solve_time:.2e}, external: {(time.monotonic() - t0):.2e} qp {self.time_qp_solution:.2e}, \
# lin {self.time_linearization:.2e} qp_iter {qp_iter}, reset {reset}")
if __name__ == "__main__":
ocp = gen_long_ocp()
AcadosOcpSolver.generate(ocp, json_file=JSON_FILE)
# AcadosOcpSolver.build(ocp.code_export_directory, with_cython=True)

View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
import math
import numpy as np
from openpilot.common.numpy_fast import clip, interp
from openpilot.common.params import Params
from cereal import log
import cereal.messaging as messaging
from openpilot.common.conversions import Conversions as CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.realtime import DT_MDL
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.car.interfaces import ACCEL_MIN, ACCEL_MAX
from openpilot.selfdrive.controls.lib.longcontrol import LongCtrlState
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import LongitudinalMpc
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC
from openpilot.selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX, CONTROL_N, get_speed_error
from openpilot.common.swaglog import cloudlog
LON_MPC_STEP = 0.2 # first step is 0.2s
A_CRUISE_MIN = -1.2
A_CRUISE_MAX_VALS = [1.6, 1.2, 0.8, 0.6]
A_CRUISE_MAX_BP = [0., 10.0, 25., 40.]
# Lookup table for turns
_A_TOTAL_MAX_V = [1.7, 3.2]
_A_TOTAL_MAX_BP = [20., 40.]
def get_max_accel(v_ego):
return interp(v_ego, A_CRUISE_MAX_BP, A_CRUISE_MAX_VALS)
def limit_accel_in_turns(v_ego, angle_steers, a_target, CP):
"""
This function returns a limited long acceleration allowed, depending on the existing lateral acceleration
this should avoid accelerating when losing the target in turns
"""
# FIXME: This function to calculate lateral accel is incorrect and should use the VehicleModel
# The lookup table for turns should also be updated if we do this
a_total_max = interp(v_ego, _A_TOTAL_MAX_BP, _A_TOTAL_MAX_V)
a_y = v_ego ** 2 * angle_steers * CV.DEG_TO_RAD / (CP.steerRatio * CP.wheelbase)
a_x_allowed = math.sqrt(max(a_total_max ** 2 - a_y ** 2, 0.))
return [a_target[0], min(a_target[1], a_x_allowed)]
class LongitudinalPlanner:
def __init__(self, CP, init_v=0.0, init_a=0.0, dt=DT_MDL):
self.CP = CP
self.mpc = LongitudinalMpc()
self.fcw = False
self.dt = dt
self.a_desired = init_a
self.v_desired_filter = FirstOrderFilter(init_v, 2.0, self.dt)
self.v_model_error = 0.0
self.v_desired_trajectory = np.zeros(CONTROL_N)
self.a_desired_trajectory = np.zeros(CONTROL_N)
self.j_desired_trajectory = np.zeros(CONTROL_N)
self.solverExecutionTime = 0.0
self.params = Params()
self.param_read_counter = 0
self.read_param()
self.personality = log.LongitudinalPersonality.standard
def read_param(self):
try:
self.personality = int(self.params.get('LongitudinalPersonality'))
except (ValueError, TypeError):
self.personality = log.LongitudinalPersonality.standard
@staticmethod
def parse_model(model_msg, model_error):
if (len(model_msg.position.x) == 33 and
len(model_msg.velocity.x) == 33 and
len(model_msg.acceleration.x) == 33):
x = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.position.x) - model_error * T_IDXS_MPC
v = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.velocity.x) - model_error
a = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.acceleration.x)
j = np.zeros(len(T_IDXS_MPC))
else:
x = np.zeros(len(T_IDXS_MPC))
v = np.zeros(len(T_IDXS_MPC))
a = np.zeros(len(T_IDXS_MPC))
j = np.zeros(len(T_IDXS_MPC))
return x, v, a, j
def update(self, sm):
if self.param_read_counter % 50 == 0:
self.read_param()
self.param_read_counter += 1
self.mpc.mode = 'blended' if sm['controlsState'].experimentalMode else 'acc'
v_ego = sm['carState'].vEgo
v_cruise_kph = min(sm['controlsState'].vCruise, V_CRUISE_MAX)
v_cruise = v_cruise_kph * CV.KPH_TO_MS
long_control_off = sm['controlsState'].longControlState == LongCtrlState.off
force_slow_decel = sm['controlsState'].forceDecel
# Reset current state when not engaged, or user is controlling the speed
reset_state = long_control_off if self.CP.openpilotLongitudinalControl else not sm['controlsState'].enabled
# No change cost when user is controlling the speed, or when standstill
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
if self.mpc.mode == 'acc':
accel_limits = [A_CRUISE_MIN, get_max_accel(v_ego)]
accel_limits_turns = limit_accel_in_turns(v_ego, sm['carState'].steeringAngleDeg, accel_limits, self.CP)
else:
accel_limits = [ACCEL_MIN, ACCEL_MAX]
accel_limits_turns = [ACCEL_MIN, ACCEL_MAX]
if reset_state:
self.v_desired_filter.x = v_ego
# Clip aEgo to cruise limits to prevent large accelerations when becoming active
self.a_desired = clip(sm['carState'].aEgo, accel_limits[0], accel_limits[1])
# Prevent divergence, smooth in current v_ego
self.v_desired_filter.x = max(0.0, self.v_desired_filter.update(v_ego))
# Compute model v_ego error
self.v_model_error = get_speed_error(sm['modelV2'], v_ego)
if force_slow_decel:
v_cruise = 0.0
# clip limits, cannot init MPC outside of bounds
accel_limits_turns[0] = min(accel_limits_turns[0], self.a_desired + 0.05)
accel_limits_turns[1] = max(accel_limits_turns[1], self.a_desired - 0.05)
self.mpc.set_weights(prev_accel_constraint, personality=self.personality)
self.mpc.set_accel_limits(accel_limits_turns[0], accel_limits_turns[1])
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
x, v, a, j = self.parse_model(sm['modelV2'], self.v_model_error)
self.mpc.update(sm['radarState'], v_cruise, x, v, a, j, personality=self.personality)
self.v_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, self.mpc.v_solution)
self.a_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, self.mpc.a_solution)
self.v_desired_trajectory = self.v_desired_trajectory_full[:CONTROL_N]
self.a_desired_trajectory = self.a_desired_trajectory_full[:CONTROL_N]
self.j_desired_trajectory = np.interp(ModelConstants.T_IDXS[:CONTROL_N], T_IDXS_MPC[:-1], self.mpc.j_solution)
# TODO counter is only needed because radar is glitchy, remove once radar is gone
self.fcw = self.mpc.crash_cnt > 2 and not sm['carState'].standstill
if self.fcw:
cloudlog.info("FCW triggered")
# Interpolate 0.05 seconds and save as starting point for next iteration
a_prev = self.a_desired
self.a_desired = float(interp(self.dt, ModelConstants.T_IDXS[:CONTROL_N], self.a_desired_trajectory))
self.v_desired_filter.x = self.v_desired_filter.x + self.dt * (self.a_desired + a_prev) / 2.0
def publish(self, sm, pm):
plan_send = messaging.new_message('longitudinalPlan')
plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
longitudinalPlan = plan_send.longitudinalPlan
longitudinalPlan.modelMonoTime = sm.logMonoTime['modelV2']
longitudinalPlan.processingDelay = (plan_send.logMonoTime / 1e9) - sm.logMonoTime['modelV2']
longitudinalPlan.speeds = self.v_desired_trajectory.tolist()
longitudinalPlan.accels = self.a_desired_trajectory.tolist()
longitudinalPlan.jerks = self.j_desired_trajectory.tolist()
longitudinalPlan.hasLead = sm['radarState'].leadOne.status
longitudinalPlan.longitudinalPlanSource = self.mpc.source
longitudinalPlan.fcw = self.fcw
longitudinalPlan.solverExecutionTime = self.mpc.solve_time
longitudinalPlan.personality = self.personality
pm.send('longitudinalPlan', plan_send)

View File

@@ -0,0 +1,75 @@
import numpy as np
from numbers import Number
from openpilot.common.numpy_fast import clip, interp
class PIDController():
def __init__(self, k_p, k_i, k_f=0., k_d=0., pos_limit=1e308, neg_limit=-1e308, rate=100):
self._k_p = k_p
self._k_i = k_i
self._k_d = k_d
self.k_f = k_f # feedforward gain
if isinstance(self._k_p, Number):
self._k_p = [[0], [self._k_p]]
if isinstance(self._k_i, Number):
self._k_i = [[0], [self._k_i]]
if isinstance(self._k_d, Number):
self._k_d = [[0], [self._k_d]]
self.pos_limit = pos_limit
self.neg_limit = neg_limit
self.i_unwind_rate = 0.3 / rate
self.i_rate = 1.0 / rate
self.speed = 0.0
self.reset()
@property
def k_p(self):
return interp(self.speed, self._k_p[0], self._k_p[1])
@property
def k_i(self):
return interp(self.speed, self._k_i[0], self._k_i[1])
@property
def k_d(self):
return interp(self.speed, self._k_d[0], self._k_d[1])
@property
def error_integral(self):
return self.i/self.k_i
def reset(self):
self.p = 0.0
self.i = 0.0
self.d = 0.0
self.f = 0.0
self.control = 0
def update(self, error, error_rate=0.0, speed=0.0, override=False, feedforward=0., freeze_integrator=False):
self.speed = speed
self.p = float(error) * self.k_p
self.f = feedforward * self.k_f
self.d = error_rate * self.k_d
if override:
self.i -= self.i_unwind_rate * float(np.sign(self.i))
else:
i = self.i + error * self.k_i * self.i_rate
control = self.p + i + self.d + self.f
# Update when changing i will move the control away from the limits
# or when i will move towards the sign of the error
if ((error >= 0 and (control <= self.pos_limit or i < 0.0)) or
(error <= 0 and (control >= self.neg_limit or i > 0.0))) and \
not freeze_integrator:
self.i = i
control = self.p + self.i + self.d + self.f
self.control = clip(control, self.neg_limit, self.pos_limit)
return self.control

View File

@@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""
Dynamic bicycle model from "The Science of Vehicle Dynamics (2014), M. Guiggiani"
The state is x = [v, r]^T
with v lateral speed [m/s], and r rotational speed [rad/s]
The input u is the steering angle [rad], and roll [rad]
The system is defined by
x_dot = A*x + B*u
A depends on longitudinal speed, u [m/s], and vehicle parameters CP
"""
from typing import Tuple
import numpy as np
from numpy.linalg import solve
from cereal import car
ACCELERATION_DUE_TO_GRAVITY = 9.8
class VehicleModel:
def __init__(self, CP: car.CarParams):
"""
Args:
CP: Car Parameters
"""
# for math readability, convert long names car params into short names
self.m: float = CP.mass
self.j: float = CP.rotationalInertia
self.l: float = CP.wheelbase
self.aF: float = CP.centerToFront
self.aR: float = CP.wheelbase - CP.centerToFront
self.chi: float = CP.steerRatioRear
self.cF_orig: float = CP.tireStiffnessFront
self.cR_orig: float = CP.tireStiffnessRear
self.update_params(1.0, CP.steerRatio)
def update_params(self, stiffness_factor: float, steer_ratio: float) -> None:
"""Update the vehicle model with a new stiffness factor and steer ratio"""
self.cF: float = stiffness_factor * self.cF_orig
self.cR: float = stiffness_factor * self.cR_orig
self.sR: float = steer_ratio
def steady_state_sol(self, sa: float, u: float, roll: float) -> np.ndarray:
"""Returns the steady state solution.
If the speed is too low we can't use the dynamic model (tire slip is undefined),
we then have to use the kinematic model
Args:
sa: Steering wheel angle [rad]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
2x1 matrix with steady state solution (lateral speed, rotational speed)
"""
if u > 0.1:
return dyn_ss_sol(sa, u, roll, self)
else:
return kin_ss_sol(sa, u, self)
def calc_curvature(self, sa: float, u: float, roll: float) -> float:
"""Returns the curvature. Multiplied by the speed this will give the yaw rate.
Args:
sa: Steering wheel angle [rad]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
Curvature factor [1/m]
"""
return (self.curvature_factor(u) * sa / self.sR) + self.roll_compensation(roll, u)
def curvature_factor(self, u: float) -> float:
"""Returns the curvature factor.
Multiplied by wheel angle (not steering wheel angle) this will give the curvature.
Args:
u: Speed [m/s]
Returns:
Curvature factor [1/m]
"""
sf = calc_slip_factor(self)
return (1. - self.chi) / (1. - sf * u**2) / self.l
def get_steer_from_curvature(self, curv: float, u: float, roll: float) -> float:
"""Calculates the required steering wheel angle for a given curvature
Args:
curv: Desired curvature [1/m]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
Steering wheel angle [rad]
"""
return (curv - self.roll_compensation(roll, u)) * self.sR * 1.0 / self.curvature_factor(u)
def roll_compensation(self, roll: float, u: float) -> float:
"""Calculates the roll-compensation to curvature
Args:
roll: Road Roll [rad]
u: Speed [m/s]
Returns:
Roll compensation curvature [rad]
"""
sf = calc_slip_factor(self)
if abs(sf) < 1e-6:
return 0
else:
return (ACCELERATION_DUE_TO_GRAVITY * roll) / ((1 / sf) - u**2)
def get_steer_from_yaw_rate(self, yaw_rate: float, u: float, roll: float) -> float:
"""Calculates the required steering wheel angle for a given yaw_rate
Args:
yaw_rate: Desired yaw rate [rad/s]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
Steering wheel angle [rad]
"""
curv = yaw_rate / u
return self.get_steer_from_curvature(curv, u, roll)
def yaw_rate(self, sa: float, u: float, roll: float) -> float:
"""Calculate yaw rate
Args:
sa: Steering wheel angle [rad]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
Yaw rate [rad/s]
"""
return self.calc_curvature(sa, u, roll) * u
def kin_ss_sol(sa: float, u: float, VM: VehicleModel) -> np.ndarray:
"""Calculate the steady state solution at low speeds
At low speeds the tire slip is undefined, so a kinematic
model is used.
Args:
sa: Steering angle [rad]
u: Speed [m/s]
VM: Vehicle model
Returns:
2x1 matrix with steady state solution
"""
K = np.zeros((2, 1))
K[0, 0] = VM.aR / VM.sR / VM.l * u
K[1, 0] = 1. / VM.sR / VM.l * u
return K * sa
def create_dyn_state_matrices(u: float, VM: VehicleModel) -> Tuple[np.ndarray, np.ndarray]:
"""Returns the A and B matrix for the dynamics system
Args:
u: Vehicle speed [m/s]
VM: Vehicle model
Returns:
A tuple with the 2x2 A matrix, and 2x2 B matrix
Parameters in the vehicle model:
cF: Tire stiffness Front [N/rad]
cR: Tire stiffness Front [N/rad]
aF: Distance from CG to front wheels [m]
aR: Distance from CG to rear wheels [m]
m: Mass [kg]
j: Rotational inertia [kg m^2]
sR: Steering ratio [-]
chi: Steer ratio rear [-]
"""
A = np.zeros((2, 2))
B = np.zeros((2, 2))
A[0, 0] = - (VM.cF + VM.cR) / (VM.m * u)
A[0, 1] = - (VM.cF * VM.aF - VM.cR * VM.aR) / (VM.m * u) - u
A[1, 0] = - (VM.cF * VM.aF - VM.cR * VM.aR) / (VM.j * u)
A[1, 1] = - (VM.cF * VM.aF**2 + VM.cR * VM.aR**2) / (VM.j * u)
# Steering input
B[0, 0] = (VM.cF + VM.chi * VM.cR) / VM.m / VM.sR
B[1, 0] = (VM.cF * VM.aF - VM.chi * VM.cR * VM.aR) / VM.j / VM.sR
# Roll input
B[0, 1] = -ACCELERATION_DUE_TO_GRAVITY
return A, B
def dyn_ss_sol(sa: float, u: float, roll: float, VM: VehicleModel) -> np.ndarray:
"""Calculate the steady state solution when x_dot = 0,
Ax + Bu = 0 => x = -A^{-1} B u
Args:
sa: Steering angle [rad]
u: Speed [m/s]
roll: Road Roll [rad]
VM: Vehicle model
Returns:
2x1 matrix with steady state solution
"""
A, B = create_dyn_state_matrices(u, VM)
inp = np.array([[sa], [roll]])
return -solve(A, B) @ inp # type: ignore
def calc_slip_factor(VM: VehicleModel) -> float:
"""The slip factor is a measure of how the curvature changes with speed
it's positive for Oversteering vehicle, negative (usual case) otherwise.
"""
return VM.m * (VM.cF * VM.aF - VM.cR * VM.aR) / (VM.l**2 * VM.cF * VM.cR)