This commit is contained in:
Your Name
2024-04-27 13:44:34 -05:00
parent ea1aad5ed1
commit 2fbe9dbea1
25 changed files with 1501 additions and 532 deletions

2
selfdrive/controls/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
calibration_param
traces

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,6 @@ import os
import json
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict, Optional
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
@@ -14,7 +13,7 @@ with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) a
OFFROAD_ALERTS = json.load(f)
def set_offroad_alert(alert: str, show_alert: bool, extra_text: Optional[str] = None) -> None:
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None:
if show_alert:
a = copy.copy(OFFROAD_ALERTS[alert])
a['extra'] = extra_text or ''
@@ -25,7 +24,7 @@ def set_offroad_alert(alert: str, show_alert: bool, extra_text: Optional[str] =
@dataclass
class AlertEntry:
alert: Optional[Alert] = None
alert: Alert | None = None
start_frame: int = -1
end_frame: int = -1
@@ -34,9 +33,9 @@ class AlertEntry:
class AlertManager:
def __init__(self):
self.alerts: Dict[str, AlertEntry] = defaultdict(AlertEntry)
self.alerts: dict[str, AlertEntry] = defaultdict(AlertEntry)
def add_many(self, frame: int, alerts: List[Alert]) -> None:
def add_many(self, frame: int, alerts: list[Alert]) -> None:
for alert in alerts:
entry = self.alerts[alert.alert_type]
entry.alert = alert
@@ -45,7 +44,7 @@ class AlertManager:
min_end_frame = entry.start_frame + alert.duration
entry.end_frame = max(frame + 1, min_end_frame)
def process_alerts(self, frame: int, clear_event_types: set) -> Optional[Alert]:
def process_alerts(self, frame: int, clear_event_types: set) -> Alert | None:
current_alert = AlertEntry()
for v in self.alerts.values():
if not v.alert:

View File

@@ -37,7 +37,6 @@ TURN_DESIRES = {
TurnDirection.turnRight: log.Desire.turnRight,
}
class DesireHelper:
def __init__(self):
self.lane_change_state = LaneChangeState.off
@@ -70,7 +69,6 @@ class DesireHelper:
lane_available = True
else:
desired_lane = frogpilotPlan.laneWidthLeft if carstate.leftBlinker else frogpilotPlan.laneWidthRight
# Check if the lane width exceeds the threshold
lane_available = desired_lane >= self.lane_detection_width
if not lateral_active or self.lane_change_timer > LANE_CHANGE_TIME_MAX:
@@ -82,7 +80,6 @@ class DesireHelper:
# Set the "turn_completed" flag to prevent lane changes after completing a turn
self.turn_completed = True
else:
# TurnDirection.turnLeft / turnRight
self.turn_direction = TurnDirection.none
# LaneChangeState.off
@@ -104,19 +101,17 @@ class DesireHelper:
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
# Conduct a nudgeless lane change if all the conditions are met
self.lane_change_wait_timer += DT_MDL
if self.nudgeless and lane_available and not self.lane_change_completed and self.lane_change_wait_timer >= self.lane_change_delay:
self.lane_change_wait_timer = 0
torque_applied = True
self.lane_change_wait_timer = 0
if not one_blinker or below_lane_change_speed:
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
elif torque_applied and not blindspot_detected:
# Set the "lane_change_completed" flag to prevent any more lane changes if the toggle is on
self.lane_change_completed = self.one_lane_change
self.lane_change_state = LaneChangeState.laneChangeStarting
self.lane_change_completed = True if self.one_lane_change else False
# LaneChangeState.laneChangeStarting
elif self.lane_change_state == LaneChangeState.laneChangeStarting:
@@ -144,10 +139,8 @@ class DesireHelper:
else:
self.lane_change_timer += DT_MDL
self.prev_one_blinker = one_blinker
# Reset the flags
self.lane_change_completed &= one_blinker
self.prev_one_blinker = one_blinker
self.turn_completed &= one_blinker
if self.turn_direction != TurnDirection.none:
@@ -173,10 +166,11 @@ class DesireHelper:
def update_frogpilot_params(self):
is_metric = self.params.get_bool("IsMetric")
lateral_tune = self.params.get_bool("LateralTune")
self.turn_desires = lateral_tune and self.params.get_bool("TurnDesires")
self.nudgeless = self.params.get_bool("NudgelessLaneChange")
self.lane_change_delay = self.params.get_int("LaneChangeTime") if self.nudgeless else 0
self.lane_detection = self.nudgeless and self.params.get_bool("LaneDetection")
self.lane_detection = self.nudgeless and self.params.get_int("LaneDetectionWidth") != 0
self.lane_detection_width = self.params.get_int("LaneDetectionWidth") * (1 if is_metric else CV.FOOT_TO_METER) / 10 if self.lane_detection else 0
self.one_lane_change = self.nudgeless and self.params.get_bool("OneLaneChange")
self.turn_desires = self.params.get_bool("TurnDesires")

View File

@@ -93,22 +93,18 @@ class VCruiseHelper:
long_press = True
break
# Reverse the long press value for reverse cruise increase
if frogpilot_variables.reverse_cruise_increase:
long_press = not long_press
if button_type is None:
return
# Confirm or deny the new speed limit value
if speed_limit_changed:
if button_type == ButtonType.accelCruise:
self.params_memory.put_bool("SLCConfirmed", True);
self.params_memory.put_bool("SLCConfirmedPressed", True);
self.params_memory.put_bool("SLCConfirmed", True)
self.params_memory.put_bool("SLCConfirmedPressed", True)
return
elif button_type == ButtonType.decelCruise:
self.params_memory.put_bool("SLCConfirmed", False);
self.params_memory.put_bool("SLCConfirmedPressed", True);
self.params_memory.put_bool("SLCConfirmed", False)
self.params_memory.put_bool("SLCConfirmedPressed", True)
return
# Don't adjust speed when pressing resume to exit standstill
@@ -120,13 +116,13 @@ class VCruiseHelper:
if not self.button_change_states[button_type]["enabled"]:
return
v_cruise_delta = v_cruise_delta * (5 if long_press else 1)
if long_press and self.v_cruise_kph % v_cruise_delta != 0: # partial interval
v_cruise_delta_interval = frogpilot_variables.custom_cruise_increase_long if long_press else frogpilot_variables.custom_cruise_increase
v_cruise_delta = v_cruise_delta * v_cruise_delta_interval
if v_cruise_delta_interval % 5 == 0 and self.v_cruise_kph % v_cruise_delta != 0: # partial interval
self.v_cruise_kph = CRUISE_NEAREST_FUNC[button_type](self.v_cruise_kph / v_cruise_delta) * v_cruise_delta
else:
self.v_cruise_kph += v_cruise_delta * CRUISE_INTERVAL_SIGN[button_type]
# Apply offset
v_cruise_offset = (frogpilot_variables.set_speed_offset * CRUISE_INTERVAL_SIGN[button_type]) if long_press else 0
if v_cruise_offset < 0:
v_cruise_offset = frogpilot_variables.set_speed_offset - v_cruise_delta
@@ -155,25 +151,20 @@ class VCruiseHelper:
if self.CP.pcmCruise:
return
if frogpilot_variables.conditional_experimental_mode:
initial = V_CRUISE_INITIAL
else:
initial = V_CRUISE_INITIAL_EXPERIMENTAL_MODE if experimental_mode else V_CRUISE_INITIAL
initial = V_CRUISE_INITIAL_EXPERIMENTAL_MODE if experimental_mode else V_CRUISE_INITIAL
# 250kph or above probably means we never had a set speed
if any(b.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for b in CS.buttonEvents) and self.v_cruise_kph_last < 250:
self.v_cruise_kph = self.v_cruise_kph_last
else:
# Initial set speed
if desired_speed_limit != 0 and frogpilot_variables.set_speed_limit:
# If there's a known speed limit and the corresponding FP toggle is set, push it to the car
self.v_cruise_kph = desired_speed_limit * CV.MS_TO_KPH
self.v_cruise_kph = int(round(desired_speed_limit * CV.MS_TO_KPH))
else:
# Use fixed initial set speed from mode etc.
self.v_cruise_kph = int(round(clip(CS.vEgo * CV.MS_TO_KPH, initial, V_CRUISE_MAX)))
self.v_cruise_cluster_kph = self.v_cruise_kph
def apply_deadzone(error, deadzone):
if error > deadzone:
error -= deadzone

View File

@@ -2,7 +2,7 @@
import math
import os
from enum import IntEnum
from typing import Dict, Union, Callable, List, Optional
from collections.abc import Callable
from cereal import log, car
import cereal.messaging as messaging
@@ -12,6 +12,7 @@ from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.locationd.calibrationd import MIN_SPEED_FILTER
from openpilot.system.version import get_short_branch
params = Params()
params_memory = Params("/dev/shm/params")
AlertSize = log.ControlsState.AlertSize
@@ -51,12 +52,12 @@ EVENT_NAME = {v: k for k, v in EventName.schema.enumerants.items()}
class Events:
def __init__(self):
self.events: List[int] = []
self.static_events: List[int] = []
self.events: list[int] = []
self.static_events: list[int] = []
self.events_prev = dict.fromkeys(EVENTS.keys(), 0)
@property
def names(self) -> List[int]:
def names(self) -> list[int]:
return self.events
def __len__(self) -> int:
@@ -74,7 +75,7 @@ class Events:
def contains(self, event_type: str) -> bool:
return any(event_type in EVENTS.get(e, {}) for e in self.events)
def create_alerts(self, event_types: List[str], callback_args=None):
def create_alerts(self, event_types: list[str], callback_args=None):
if callback_args is None:
callback_args = []
@@ -135,7 +136,7 @@ class Alert:
self.creation_delay = creation_delay
self.alert_type = ""
self.event_type: Optional[str] = None
self.event_type: str | None = None
def __str__(self) -> str:
return f"{self.alert_text_1}/{self.alert_text_2} {self.priority} {self.visual_alert} {self.audible_alert}"
@@ -261,15 +262,14 @@ def no_gps_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, m
AlertStatus.normal, AlertSize.mid,
Priority.LOWER, VisualAlert.none, AudibleAlert.none, .2, creation_delay=300.)
def torque_nn_load_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
model_name = params_memory.get("NNFFModelName")
model_name = params.get("NNFFModelName", encoding='utf-8')
if model_name == "":
return Alert(
"NNFF Torque Controller not available",
"Donate logs to Twilsonco to get it added!",
"Donate logs to Twilsonco to get your car supported!",
AlertStatus.userPrompt, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.prompt, 6.0)
Priority.LOW, VisualAlert.none, AudibleAlert.prompt, 5.0)
else:
return Alert(
"NNFF Torque Controller loaded",
@@ -379,13 +379,14 @@ def holiday_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster,
def no_lane_available_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert:
lane_width = sm['frogpilotPlan'].laneWidthLeft if CS.leftBlinker else sm['frogpilotPlan'].laneWidthRight
lane_width_msg = f"{lane_width:.1f} meters" if metric else f"{lane_width * CV.METER_TO_FOOT:.1f} feet"
return Alert(
"No lane available",
f"Detected lane width is only {lane_width_msg}",
AlertStatus.normal, AlertSize.mid,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .2)
EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = {
EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
# ********** events with no alerts **********
EventName.stockFcw: {},
@@ -1009,7 +1010,13 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = {
},
# FrogPilot Events
EventName.frogSteerSaturated: {
EventName.blockUser: {
ET.PERMANENT: NormalPermanentAlert("Dashcam mode",
"Please don't use the 'Development' branch!",
priority=Priority.HIGHEST),
},
EventName.goatSteerSaturated: {
ET.WARNING: Alert(
"Turn Exceeds Steering Limit",
"JESUS TAKE THE WHEEL!!",
@@ -1110,6 +1117,14 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = {
Priority.LOW, VisualAlert.none, AudibleAlert.nessie, 4.),
},
EventName.accel40: {
ET.WARNING: Alert(
"Great Scott!",
"🚗💨",
AlertStatus.frogpilot, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.doc, 4.),
},
EventName.firefoxSteerSaturated: {
ET.WARNING: Alert(
"Turn Exceeds Steering Limit",
@@ -1137,7 +1152,7 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = {
EventName.yourFrogTriedToKillMe: {
ET.PERMANENT: Alert(
"Your frog tried to kill me...",
"😡",
"👺",
AlertStatus.frogpilot, AlertSize.mid,
Priority.MID, VisualAlert.none, AudibleAlert.angry, 5.),
},
@@ -1150,7 +1165,7 @@ if __name__ == '__main__':
from collections import defaultdict
event_names = {v: k for k, v in EventName.schema.enumerants.items()}
alerts_by_type: Dict[str, Dict[Priority, List[str]]] = defaultdict(lambda: defaultdict(list))
alerts_by_type: dict[str, dict[Priority, list[str]]] = defaultdict(lambda: defaultdict(list))
CP = car.CarParams.new_message()
CS = car.CarState.new_message()
@@ -1162,7 +1177,7 @@ if __name__ == '__main__':
alert = alert(CP, CS, sm, False, 1)
alerts_by_type[et][alert.priority].append(event_names[i])
all_alerts: Dict[str, List[tuple[Priority, List[str]]]] = {}
all_alerts: dict[str, list[tuple[Priority, list[str]]]] = {}
for et, priority_alerts in alerts_by_type.items():
all_alerts[et] = sorted(priority_alerts.items(), key=lambda x: x[0], reverse=True)

View File

@@ -73,10 +73,10 @@ class LatControlTorque(LatControl):
self.steering_angle_deadzone_deg = self.torque_params.steeringAngleDeadzoneDeg
# Twilsonco's Lateral Neural Network Feedforward
self.use_nn = CI.has_lateral_torque_nn
self.use_lateral_jerk = CI.use_lateral_jerk
self.use_nnff = CI.use_nnff
self.use_nnff_lite = CI.use_nnff_lite
if self.use_nn or self.use_lateral_jerk:
if self.use_nnff or self.use_nnff_lite:
# Instantaneous lateral jerk changes very rapidly, making it not useful on its own,
# however, we can "look ahead" to the future planned lateral jerk in order to guage
# whether the current desired lateral jerk will persist into the future, i.e.
@@ -96,7 +96,7 @@ class LatControlTorque(LatControl):
self.t_diffs = np.diff(ModelConstants.T_IDXS)
self.desired_lat_jerk_time = CP.steerActuatorDelay + 0.3
if self.use_nn:
if self.use_nnff:
self.pitch = FirstOrderFilter(0.0, 0.5, 0.01)
# NN model takes current v_ego, lateral_accel, lat accel/jerk error, roll, and past/future/planned data
# of lat accel and roll
@@ -137,7 +137,7 @@ class LatControlTorque(LatControl):
if self.use_steering_angle:
actual_curvature = actual_curvature_vm
curvature_deadzone = abs(VM.calc_curvature(math.radians(self.steering_angle_deadzone_deg), CS.vEgo, 0.0))
if self.use_nn or self.use_lateral_jerk:
if self.use_nnff or self.use_nnff_lite:
actual_curvature_rate = -VM.calc_curvature(math.radians(CS.steeringRateDeg), CS.vEgo, 0.0)
actual_lateral_jerk = actual_curvature_rate * CS.vEgo ** 2
else:
@@ -152,14 +152,14 @@ class LatControlTorque(LatControl):
actual_lateral_accel = actual_curvature * CS.vEgo ** 2
lateral_accel_deadzone = curvature_deadzone * CS.vEgo ** 2
low_speed_factor = interp(CS.vEgo, LOW_SPEED_X, LOW_SPEED_Y if not self.use_nn else LOW_SPEED_Y_NN)**2
low_speed_factor = interp(CS.vEgo, LOW_SPEED_X, LOW_SPEED_Y if not self.use_nnff else LOW_SPEED_Y_NN)**2
setpoint = desired_lateral_accel + low_speed_factor * desired_curvature
measurement = actual_lateral_accel + low_speed_factor * actual_curvature
lateral_jerk_setpoint = 0
lateral_jerk_measurement = 0
if self.use_nn or self.use_lateral_jerk:
if self.use_nnff or self.use_nnff_lite:
# prepare "look-ahead" desired lateral jerk
lat_accel_friction_factor = self.lat_accel_friction_factor
if len(model_data.acceleration.y) == ModelConstants.IDX_N:
@@ -180,7 +180,7 @@ class LatControlTorque(LatControl):
lookahead_lateral_jerk = 0.0
model_good = model_data is not None and len(model_data.orientation.x) >= CONTROL_N
if self.use_nn and model_good:
if self.use_nnff and model_good:
# update past data
roll = params.roll
pitch = self.pitch.update(llk.calibratedOrientationNED.value[1]) if len(llk.calibratedOrientationNED.value) > 1 else 0.0
@@ -219,17 +219,17 @@ class LatControlTorque(LatControl):
# apply friction override for cars with low NN friction response
if self.nn_friction_override:
pid_log.error += self.torque_from_lateral_accel(LatControlInputs(0.0, 0.0, CS.vEgo, CS.aEgo), self.torque_params,
friction_input, lateral_accel_deadzone, friction_compensation=True, gravity_adjusted=False)
friction_input, lateral_accel_deadzone, friction_compensation=True, gravity_adjusted=False)
nn_log = nn_input + nnff_setpoint_input + nnff_measurement_input
else:
gravity_adjusted_lateral_accel = desired_lateral_accel - roll_compensation
torque_from_setpoint = self.torque_from_lateral_accel(LatControlInputs(setpoint, roll_compensation, CS.vEgo, CS.aEgo), self.torque_params,
lateral_jerk_setpoint, lateral_accel_deadzone, friction_compensation=self.use_lateral_jerk, gravity_adjusted=False)
lateral_jerk_setpoint, lateral_accel_deadzone, friction_compensation=self.use_nnff_lite, gravity_adjusted=False)
torque_from_measurement = self.torque_from_lateral_accel(LatControlInputs(measurement, roll_compensation, CS.vEgo, CS.aEgo), self.torque_params,
lateral_jerk_measurement, lateral_accel_deadzone, friction_compensation=self.use_lateral_jerk, gravity_adjusted=False)
lateral_jerk_measurement, lateral_accel_deadzone, friction_compensation=self.use_nnff_lite, gravity_adjusted=False)
pid_log.error = torque_from_setpoint - torque_from_measurement
error = desired_lateral_accel - actual_lateral_accel
if self.use_lateral_jerk:
if self.use_nnff_lite:
friction_input = lat_accel_friction_factor * error + self.lat_jerk_friction_factor * lookahead_lateral_jerk
else:
friction_input = error

View File

@@ -1,5 +1,6 @@
from cereal import car
from openpilot.common.numpy_fast import clip, interp
from openpilot.common.params import Params
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, apply_deadzone
from openpilot.selfdrive.controls.lib.pid import PIDController
@@ -60,6 +61,12 @@ class LongControl:
self.v_pid = 0.0
self.last_output_accel = 0.0
# FrogPilot variables
self.params = Params()
self.params_memory = Params("/dev/shm/params")
self.update_frogpilot_params()
def reset(self, v_pid):
"""Reset PID controller and change setpoint"""
self.pid.reset()
@@ -129,4 +136,20 @@ class LongControl:
self.last_output_accel = clip(output_accel, accel_limits[0], accel_limits[1])
if self.params_memory.get_bool("FrogPilotTogglesUpdated"):
self.update_frogpilot_params()
return self.last_output_accel
def update_frogpilot_params(self):
kiV = [self.params.get_float("kiV1"), self.params.get_float("kiV2"), self.params.get_float("kiV3"), self.params.get_float("kiV4")]
kpV = [self.params.get_float("kpV1"), self.params.get_float("kpV2"), self.params.get_float("kpV3"), self.params.get_float("kpV4")]
if self.params.get_bool("Tuning"):
self.pid = PIDController((self.CP.longitudinalTuning.kpBP, kpV),
(self.CP.longitudinalTuning.kiBP, kiV),
k_f=self.CP.longitudinalTuning.kf, rate=1 / DT_CTRL)
else:
self.pid = PIDController((self.CP.longitudinalTuning.kpBP, self.CP.longitudinalTuning.kpV),
(self.CP.longitudinalTuning.kiBP, self.CP.longitudinalTuning.kiV),
k_f=self.CP.longitudinalTuning.kf, rate=1 / DT_CTRL)

161
selfdrive/controls/lib/longitudinal_planner.py Executable file → Normal file
View File

@@ -2,35 +2,36 @@
import math
import numpy as np
from openpilot.common.numpy_fast import clip, interp
from openpilot.common.params import Params
from cereal import log
import cereal.messaging as messaging
from openpilot.common.conversions import Conversions as CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.params import Params
from openpilot.common.simple_kalman import KF1D
from openpilot.common.realtime import DT_MDL
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.car.interfaces import ACCEL_MIN, ACCEL_MAX
from openpilot.selfdrive.controls.lib.longcontrol import LongCtrlState
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import LongitudinalMpc
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC, LEAD_ACCEL_TAU
from openpilot.selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX, CONTROL_N, get_speed_error
from openpilot.common.swaglog import cloudlog
from openpilot.system.version import get_short_branch
from openpilot.selfdrive.frogpilot.functions.frogpilot_functions import CRUISING_SPEED
from openpilot.selfdrive.frogpilot.controls.lib.model_manager import RADARLESS_MODELS
LON_MPC_STEP = 0.2 # first step is 0.2s
A_CRUISE_MIN = -1.2
A_CRUISE_MAX_VALS = [1.6, 1.2, 0.8, 0.6]
A_CRUISE_MAX_BP = [0., 10.0, 25., 40.]
ACCEL_MAX_PLUS = [ACCEL_MAX, 4.0]
ACCEL_MAX_PLUS_BP = [0., CRUISING_SPEED]
# Lookup table for turns
_A_TOTAL_MAX_V = [1.7, 3.2]
_A_TOTAL_MAX_BP = [20., 40.]
# Kalman filter states enum
LEAD_KALMAN_SPEED, LEAD_KALMAN_ACCEL = 0, 1
def get_max_accel(v_ego):
return interp(v_ego, A_CRUISE_MAX_BP, A_CRUISE_MAX_VALS)
@@ -51,6 +52,72 @@ def limit_accel_in_turns(v_ego, angle_steers, a_target, CP):
return [a_target[0], min(a_target[1], a_x_allowed)]
def lead_kf(v_lead: float, dt: float = 0.05):
# Lead Kalman Filter params, calculating K from A, C, Q, R requires the control library.
# hardcoding a lookup table to compute K for values of radar_ts between 0.01s and 0.2s
assert dt > .01 and dt < .2, "Radar time step must be between .01s and 0.2s"
A = [[1.0, dt], [0.0, 1.0]]
C = [1.0, 0.0]
#Q = np.matrix([[10., 0.0], [0.0, 100.]])
#R = 1e3
#K = np.matrix([[ 0.05705578], [ 0.03073241]])
dts = [dt * 0.01 for dt in range(1, 21)]
K0 = [0.12287673, 0.14556536, 0.16522756, 0.18281627, 0.1988689, 0.21372394,
0.22761098, 0.24069424, 0.253096, 0.26491023, 0.27621103, 0.28705801,
0.29750003, 0.30757767, 0.31732515, 0.32677158, 0.33594201, 0.34485814,
0.35353899, 0.36200124]
K1 = [0.29666309, 0.29330885, 0.29042818, 0.28787125, 0.28555364, 0.28342219,
0.28144091, 0.27958406, 0.27783249, 0.27617149, 0.27458948, 0.27307714,
0.27162685, 0.27023228, 0.26888809, 0.26758976, 0.26633338, 0.26511557,
0.26393339, 0.26278425]
K = [[interp(dt, dts, K0)], [interp(dt, dts, K1)]]
kf = KF1D([[v_lead], [0.0]], A, C, K)
return kf
class Lead:
def __init__(self):
self.dRel = 0.0
self.yRel = 0.0
self.vLead = 0.0
self.aLead = 0.0
self.vLeadK = 0.0
self.aLeadK = 0.0
self.aLeadTau = LEAD_ACCEL_TAU
self.prob = 0.0
self.status = False
self.kf: KF1D | None = None
def reset(self):
self.status = False
self.kf = None
self.aLeadTau = LEAD_ACCEL_TAU
def update(self, dRel: float, yRel: float, vLead: float, aLead: float, prob: float):
self.dRel = dRel
self.yRel = yRel
self.vLead = vLead
self.aLead = aLead
self.prob = prob
self.status = True
if self.kf is None:
self.kf = lead_kf(self.vLead)
else:
self.kf.update(self.vLead)
self.vLeadK = float(self.kf.x[LEAD_KALMAN_SPEED][0])
self.aLeadK = float(self.kf.x[LEAD_KALMAN_ACCEL][0])
# Learn if constant acceleration
if abs(self.aLeadK) < 0.5:
self.aLeadTau = LEAD_ACCEL_TAU
else:
self.aLeadTau *= 0.9
class LongitudinalPlanner:
def __init__(self, CP, init_v=0.0, init_a=0.0, dt=DT_MDL):
self.CP = CP
@@ -62,23 +129,26 @@ class LongitudinalPlanner:
self.v_desired_filter = FirstOrderFilter(init_v, 2.0, self.dt)
self.v_model_error = 0.0
self.lead_one = Lead()
self.lead_two = Lead()
self.v_desired_trajectory = np.zeros(CONTROL_N)
self.a_desired_trajectory = np.zeros(CONTROL_N)
self.j_desired_trajectory = np.zeros(CONTROL_N)
self.solverExecutionTime = 0.0
self.params = Params()
self.param_read_counter = 0
self.read_param()
self.personality = log.LongitudinalPersonality.standard
def read_param(self):
try:
self.personality = int(self.params.get('LongitudinalPersonality'))
except (ValueError, TypeError):
self.personality = log.LongitudinalPersonality.standard
# FrogPilot variables
self.params = Params()
self.params_memory = Params("/dev/shm/params")
self.radarless_model = self.params.get("Model", encoding='utf-8') in RADARLESS_MODELS
self.release = get_short_branch() == "FrogPilot"
self.update_frogpilot_params()
@staticmethod
def parse_model(model_msg, model_error):
def parse_model(model_msg, model_error, v_ego, taco_tune):
if (len(model_msg.position.x) == 33 and
len(model_msg.velocity.x) == 33 and
len(model_msg.acceleration.x) == 33):
@@ -91,12 +161,16 @@ class LongitudinalPlanner:
v = np.zeros(len(T_IDXS_MPC))
a = np.zeros(len(T_IDXS_MPC))
j = np.zeros(len(T_IDXS_MPC))
if taco_tune:
max_lat_accel = interp(v_ego, [5, 10, 20], [1.5, 2.0, 3.0])
curvatures = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.orientationRate.z) / np.clip(v, 0.3, 100.0)
max_v = np.sqrt(max_lat_accel / (np.abs(curvatures) + 1e-3)) - 2.0
v = np.minimum(max_v, v)
return x, v, a, j
def update(self, sm, frogpilot_planner, params_memory):
if self.param_read_counter % 50 == 0 or params_memory.get_bool("PersonalityChangedViaUI") or params_memory.get_bool("PersonalityChangedViaWheel"):
self.read_param()
self.param_read_counter += 1
def update(self, sm):
self.mpc.mode = 'blended' if sm['controlsState'].experimentalMode else 'acc'
v_ego = sm['carState'].vEgo
@@ -112,7 +186,7 @@ class LongitudinalPlanner:
# No change cost when user is controlling the speed, or when standstill
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
accel_limits = frogpilot_planner.accel_limits
accel_limits = [sm['frogpilotPlan'].minAcceleration, sm['frogpilotPlan'].maxAcceleration]
if self.mpc.mode == 'acc':
accel_limits_turns = limit_accel_in_turns(v_ego, sm['carState'].steeringAngleDeg, accel_limits, self.CP)
else:
@@ -120,10 +194,8 @@ class LongitudinalPlanner:
if reset_state:
self.v_desired_filter.x = v_ego
# Slowly ramp up the acceleration to prevent jerky behaviors from a standstill
max_acceleration = min(interp(v_ego, ACCEL_MAX_PLUS_BP, ACCEL_MAX_PLUS), accel_limits[1])
# Clip aEgo to cruise limits to prevent large accelerations when becoming active
self.a_desired = clip(sm['carState'].aEgo, accel_limits[0], max_acceleration)
self.a_desired = clip(sm['carState'].aEgo, accel_limits[0], accel_limits[1])
# Prevent divergence, smooth in current v_ego
self.v_desired_filter.x = max(0.0, self.v_desired_filter.update(v_ego))
@@ -136,13 +208,26 @@ class LongitudinalPlanner:
accel_limits_turns[0] = min(accel_limits_turns[0], self.a_desired + 0.05)
accel_limits_turns[1] = max(accel_limits_turns[1], self.a_desired - 0.05)
self.mpc.set_weights(prev_accel_constraint,
frogpilot_planner.custom_personalities, frogpilot_planner.aggressive_jerk, frogpilot_planner.standard_jerk, frogpilot_planner.relaxed_jerk,
personality=self.personality)
if self.radarless_model:
model_leads = list(sm['modelV2'].leadsV3)
# TODO lead state should be invalidated if its different point than the previous one
lead_states = [self.lead_one, self.lead_two]
for index in range(len(lead_states)):
if len(model_leads) > index:
model_lead = model_leads[index]
lead_states[index].update(model_lead.x[0], model_lead.y[0], model_lead.v[0], model_lead.a[0], model_lead.prob)
else:
lead_states[index].reset()
else:
self.lead_one = sm['radarState'].leadOne
self.lead_two = sm['radarState'].leadTwo
self.mpc.set_weights(sm['frogpilotPlan'].jerk, prev_accel_constraint, personality=sm['controlsState'].personality)
self.mpc.set_accel_limits(accel_limits_turns[0], accel_limits_turns[1])
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
x, v, a, j = self.parse_model(sm['modelV2'], self.v_model_error)
self.mpc.update(sm['radarState'], frogpilot_planner.v_cruise, x, v, a, j, frogpilot_planner, personality=self.personality)
x, v, a, j = self.parse_model(sm['modelV2'], self.v_model_error, v_ego, self.taco_tune)
self.mpc.update(self.lead_one, self.lead_two, sm['frogpilotPlan'].vCruise, x, v, a, j, self.radarless_model, sm['frogpilotPlan'].tFollow,
sm['frogpilotCarControl'].trafficModeActive, personality=sm['controlsState'].personality)
self.v_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, self.mpc.v_solution)
self.a_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, self.mpc.a_solution)
@@ -160,9 +245,14 @@ class LongitudinalPlanner:
self.a_desired = float(interp(self.dt, ModelConstants.T_IDXS[:CONTROL_N], self.a_desired_trajectory))
self.v_desired_filter.x = self.v_desired_filter.x + self.dt * (self.a_desired + a_prev) / 2.0
frogpilot_planner.update(sm['carState'], sm['controlsState'], sm['modelV2'], self.mpc, sm, v_cruise, v_ego)
if self.params_memory.get_bool("FrogPilotTogglesUpdated"):
self.update_frogpilot_params()
def publish(self, sm, pm, frogpilot_planner):
def update_frogpilot_params(self):
lateral_tune = self.params.get_bool("LateralTune")
self.taco_tune = lateral_tune and self.params.get_bool("TacoTune") and not self.release
def publish(self, sm, pm):
plan_send = messaging.new_message('longitudinalPlan')
plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
@@ -175,13 +265,10 @@ class LongitudinalPlanner:
longitudinalPlan.accels = self.a_desired_trajectory.tolist()
longitudinalPlan.jerks = self.j_desired_trajectory.tolist()
longitudinalPlan.hasLead = sm['radarState'].leadOne.status
longitudinalPlan.hasLead = self.lead_one.status
longitudinalPlan.longitudinalPlanSource = self.mpc.source
longitudinalPlan.fcw = self.fcw
longitudinalPlan.solverExecutionTime = self.mpc.solve_time
longitudinalPlan.personality = self.personality
pm.send('longitudinalPlan', plan_send)
frogpilot_planner.publish(sm, pm, self.mpc)

View File

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
import random
import unittest
from openpilot.selfdrive.controls.lib.events import Alert, EVENTS
from openpilot.selfdrive.controls.lib.alertmanager import AlertManager
class TestAlertManager(unittest.TestCase):
def test_duration(self):
"""
Enforce that an alert lasts for max(alert duration, duration the alert is added)
"""
for duration in range(1, 100):
alert = None
while not isinstance(alert, Alert):
event = random.choice([e for e in EVENTS.values() if len(e)])
alert = random.choice(list(event.values()))
alert.duration = duration
# check two cases:
# - alert is added to AM for <= the alert's duration
# - alert is added to AM for > alert's duration
for greater in (True, False):
if greater:
add_duration = duration + random.randint(1, 10)
else:
add_duration = random.randint(1, duration)
show_duration = max(duration, add_duration)
AM = AlertManager()
for frame in range(duration+10):
if frame < add_duration:
AM.add_many(frame, [alert, ])
current_alert = AM.process_alerts(frame, {})
shown = current_alert is not None
should_show = frame <= show_duration
self.assertEqual(shown, should_show, msg=f"{frame=} {add_duration=} {duration=}")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
import unittest
from parameterized import parameterized
from cereal import car, log
from openpilot.selfdrive.car.car_helpers import interfaces
from openpilot.selfdrive.car.honda.values import CAR as HONDA
from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA
from openpilot.selfdrive.car.nissan.values import CAR as NISSAN
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.common.mock.generators import generate_liveLocationKalman
class TestLatControl(unittest.TestCase):
@parameterized.expand([(HONDA.CIVIC, LatControlPID), (TOYOTA.RAV4, LatControlTorque), (NISSAN.LEAF, LatControlAngle)])
def test_saturation(self, car_name, controller):
CarInterface, CarController, CarState = interfaces[car_name]
CP = CarInterface.get_non_essential_params(car_name)
CI = CarInterface(CP, CarController, CarState)
VM = VehicleModel(CP)
controller = controller(CP, CI)
CS = car.CarState.new_message()
CS.vEgo = 30
CS.steeringPressed = False
params = log.LiveParametersData.new_message()
llk = generate_liveLocationKalman()
for _ in range(1000):
_, _, lac_log = controller.update(True, CS, VM, params, False, 1, llk)
self.assertTrue(lac_log.saturated)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
import math
import unittest
import numpy as np
from control import StateSpace
from openpilot.selfdrive.car.honda.interface import CarInterface
from openpilot.selfdrive.car.honda.values import CAR
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel, dyn_ss_sol, create_dyn_state_matrices
class TestVehicleModel(unittest.TestCase):
def setUp(self):
CP = CarInterface.get_non_essential_params(CAR.CIVIC)
self.VM = VehicleModel(CP)
def test_round_trip_yaw_rate(self):
# TODO: fix VM to work at zero speed
for u in np.linspace(1, 30, num=10):
for roll in np.linspace(math.radians(-20), math.radians(20), num=11):
for sa in np.linspace(math.radians(-20), math.radians(20), num=11):
yr = self.VM.yaw_rate(sa, u, roll)
new_sa = self.VM.get_steer_from_yaw_rate(yr, u, roll)
self.assertAlmostEqual(sa, new_sa)
def test_dyn_ss_sol_against_yaw_rate(self):
"""Verify that the yaw_rate helper function matches the results
from the state space model."""
for roll in np.linspace(math.radians(-20), math.radians(20), num=11):
for u in np.linspace(1, 30, num=10):
for sa in np.linspace(math.radians(-20), math.radians(20), num=11):
# Compute yaw rate based on state space model
_, yr1 = dyn_ss_sol(sa, u, roll, self.VM)
# Compute yaw rate using direct computations
yr2 = self.VM.yaw_rate(sa, u, roll)
self.assertAlmostEqual(float(yr1[0]), yr2)
def test_syn_ss_sol_simulate(self):
"""Verifies that dyn_ss_sol matches a simulation"""
for roll in np.linspace(math.radians(-20), math.radians(20), num=11):
for u in np.linspace(1, 30, num=10):
A, B = create_dyn_state_matrices(u, self.VM)
# Convert to discrete time system
ss = StateSpace(A, B, np.eye(2), np.zeros((2, 2)))
ss = ss.sample(0.01)
for sa in np.linspace(math.radians(-20), math.radians(20), num=11):
inp = np.array([[sa], [roll]])
# Simulate for 1 second
x1 = np.zeros((2, 1))
for _ in range(100):
x1 = ss.A @ x1 + ss.B @ inp
# Compute steady state solution directly
x2 = dyn_ss_sol(sa, u, roll, self.VM)
np.testing.assert_almost_equal(x1, x2, decimal=3)
if __name__ == "__main__":
unittest.main()

View File

@@ -12,7 +12,6 @@ x_dot = A*x + B*u
A depends on longitudinal speed, u [m/s], and vehicle parameters CP
"""
from typing import Tuple
import numpy as np
from numpy.linalg import solve
@@ -169,7 +168,7 @@ def kin_ss_sol(sa: float, u: float, VM: VehicleModel) -> np.ndarray:
return K * sa
def create_dyn_state_matrices(u: float, VM: VehicleModel) -> Tuple[np.ndarray, np.ndarray]:
def create_dyn_state_matrices(u: float, VM: VehicleModel) -> tuple[np.ndarray, np.ndarray]:
"""Returns the A and B matrix for the dynamics system
Args:

View File

@@ -6,8 +6,6 @@ from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlanner
import cereal.messaging as messaging
from openpilot.selfdrive.frogpilot.functions.frogpilot_planner import FrogPilotPlanner
def publish_ui_plan(sm, pm, longitudinal_planner):
ui_send = messaging.new_message('uiPlan')
ui_send.valid = sm.all_checks(service_list=['carState', 'controlsState', 'modelV2'])
@@ -24,27 +22,22 @@ def plannerd_thread():
cloudlog.info("plannerd is waiting for CarParams")
params = Params()
params_memory = Params("/dev/shm/params")
with car.CarParams.from_bytes(params.get("CarParams", block=True)) as msg:
CP = msg
cloudlog.info("plannerd got CarParams: %s", CP.carName)
frogpilot_planner = FrogPilotPlanner(CP, params, params_memory)
longitudinal_planner = LongitudinalPlanner(CP)
pm = messaging.PubMaster(['longitudinalPlan', 'uiPlan', 'frogpilotPlan'])
sm = messaging.SubMaster(['carControl', 'carState', 'controlsState', 'radarState', 'modelV2', 'frogpilotNavigation'],
pm = messaging.PubMaster(['longitudinalPlan', 'uiPlan'])
sm = messaging.SubMaster(['carControl', 'carState', 'controlsState', 'radarState', 'modelV2', 'frogpilotCarControl', 'frogpilotPlan'],
poll='modelV2', ignore_avg_freq=['radarState'])
while True:
sm.update()
if sm.updated['modelV2']:
longitudinal_planner.update(sm, frogpilot_planner, params_memory)
longitudinal_planner.publish(sm, pm, frogpilot_planner)
longitudinal_planner.update(sm)
longitudinal_planner.publish(sm, pm)
publish_ui_plan(sm, pm, longitudinal_planner)
if params_memory.get_bool("FrogPilotTogglesUpdated"):
frogpilot_planner.update_frogpilot_params(params)
def main():
plannerd_thread()

View File

@@ -2,7 +2,7 @@
import importlib
import math
from collections import deque
from typing import Optional, Dict, Any
from typing import Any, Optional
import capnp
from cereal import messaging, log, car
@@ -125,7 +125,7 @@ def laplacian_pdf(x: float, mu: float, b: float):
return math.exp(-abs(x-mu)/b)
def match_vision_to_track(v_ego: float, lead: capnp._DynamicStructReader, tracks: Dict[int, Track]):
def match_vision_to_track(v_ego: float, lead: capnp._DynamicStructReader, tracks: dict[int, Track]):
offset_vision_dist = lead.x[0] - RADAR_TO_CAMERA
def prob(c):
@@ -133,7 +133,7 @@ def match_vision_to_track(v_ego: float, lead: capnp._DynamicStructReader, tracks
prob_y = laplacian_pdf(c.yRel, -lead.y[0], lead.yStd[0])
prob_v = laplacian_pdf(c.vRel + v_ego, lead.v[0], lead.vStd[0])
# This is isn't exactly right, but good heuristic
# This isn't exactly right, but it's a good heuristic
return prob_d * prob_y * prob_v
track = max(tracks.values(), key=prob)
@@ -166,8 +166,8 @@ def get_RadarState_from_vision(lead_msg: capnp._DynamicStructReader, v_ego: floa
}
def get_lead(v_ego: float, ready: bool, tracks: Dict[int, Track], lead_msg: capnp._DynamicStructReader,
model_v_ego: float, lead_detection_threshold: float = .5, low_speed_override: bool = True) -> Dict[str, Any]:
def get_lead(v_ego: float, ready: bool, tracks: dict[int, Track], lead_msg: capnp._DynamicStructReader,
model_v_ego: float, lead_detection_threshold: float, low_speed_override: bool = True) -> dict[str, Any]:
# Determine leads, this is where the essential logic happens
if len(tracks) > 0 and ready and lead_msg.prob > lead_detection_threshold:
track = match_vision_to_track(v_ego, lead_msg, tracks)
@@ -196,14 +196,14 @@ class RadarD:
def __init__(self, radar_ts: float, delay: int = 0):
self.current_time = 0.0
self.tracks: Dict[int, Track] = {}
self.tracks: dict[int, Track] = {}
self.kalman_params = KalmanParams(radar_ts)
self.v_ego = 0.0
self.v_ego_hist = deque([0.0], maxlen=delay+1)
self.last_v_ego_frame = -1
self.radar_state: Optional[capnp._DynamicStructBuilder] = None
self.radar_state: capnp._DynamicStructBuilder | None = None
self.radar_state_valid = False
self.ready = False

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
import importlib
from typing import Optional
from cereal import messaging, car
from openpilot.common.params import Params
from openpilot.common.realtime import Ratekeeper, Priority, config_realtime_process
from openpilot.common.swaglog import cloudlog
# radar tracks
SPEED, ACCEL = 0, 1 # Kalman filter states enum
# stationary qualification parameters
V_EGO_STATIONARY = 4. # no stationary object flag below this speed
RADAR_TO_CENTER = 2.7 # (deprecated) RADAR is ~ 2.7m ahead from center of car
RADAR_TO_CAMERA = 1.52 # RADAR is ~ 1.5m ahead from center of mesh frame
class RadarD:
def __init__(self, radar_ts: float, delay: int = 0):
self.points: dict[int, tuple[float, float, float]] = {}
self.radar_tracks_valid = False
def update(self, rr: Optional[car.RadarData]):
radar_points = []
radar_errors = []
if rr is not None:
radar_points = rr.points
radar_errors = rr.errors
self.radar_tracks_valid = len(radar_errors) == 0
self.points = {}
for pt in radar_points:
self.points[pt.trackId] = (pt.dRel, pt.yRel, pt.vRel)
def publish(self):
tracks_msg = messaging.new_message('liveTracks', len(self.points))
tracks_msg.valid = self.radar_tracks_valid
for index, tid in enumerate(sorted(self.points.keys())):
tracks_msg.liveTracks[index] = {
"trackId": tid,
"dRel": float(self.points[tid][0]) + RADAR_TO_CAMERA,
"yRel": -float(self.points[tid][1]),
"vRel": float(self.points[tid][2]),
}
return tracks_msg
# publishes radar tracks
def main():
config_realtime_process(5, Priority.CTRL_LOW)
# wait for stats about the car to come in from controls
cloudlog.info("radard is waiting for CarParams")
with car.CarParams.from_bytes(Params().get("CarParams", block=True)) as msg:
CP = msg
cloudlog.info("radard got CarParams")
# import the radar from the fingerprint
cloudlog.info("radard is importing %s", CP.carName)
RadarInterface = importlib.import_module(f'selfdrive.car.{CP.carName}.radar_interface').RadarInterface
# *** setup messaging
can_sock = messaging.sub_sock('can')
pub_sock = messaging.pub_sock('liveTracks')
RI = RadarInterface(CP)
# TODO timing is different between cars, need a single time step for all cars
# TODO just take the fastest one for now, and keep resending same messages for slower radars
rk = Ratekeeper(1.0 / CP.radarTimeStep, print_delay_threshold=None)
RD = RadarD(CP.radarTimeStep, RI.delay)
while 1:
can_strings = messaging.drain_sock_raw(can_sock, wait_for_one=True)
rr = RI.update(can_strings)
if rr is None:
continue
RD.update(rr)
msg = RD.publish()
pub_sock.send(msg.to_bytes())
rk.monitor_time()
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
import copy
import json
import os
import unittest
import random
from PIL import Image, ImageDraw, ImageFont
from cereal import log, car
from cereal.messaging import SubMaster
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.events import Alert, EVENTS, ET
from openpilot.selfdrive.controls.lib.alertmanager import set_offroad_alert
from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS
AlertSize = log.ControlsState.AlertSize
OFFROAD_ALERTS_PATH = os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")
# TODO: add callback alerts
ALERTS = []
for event_types in EVENTS.values():
for alert in event_types.values():
ALERTS.append(alert)
class TestAlerts(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open(OFFROAD_ALERTS_PATH) as f:
cls.offroad_alerts = json.loads(f.read())
# Create fake objects for callback
cls.CS = car.CarState.new_message()
cls.CP = car.CarParams.new_message()
cfg = [c for c in CONFIGS if c.proc_name == 'controlsd'][0]
cls.sm = SubMaster(cfg.pubs)
def test_events_defined(self):
# Ensure all events in capnp schema are defined in events.py
events = car.CarEvent.EventName.schema.enumerants
for name, e in events.items():
if not name.endswith("DEPRECATED"):
fail_msg = "%s @%d not in EVENTS" % (name, e)
self.assertTrue(e in EVENTS.keys(), msg=fail_msg)
# ensure alert text doesn't exceed allowed width
def test_alert_text_length(self):
font_path = os.path.join(BASEDIR, "selfdrive/assets/fonts")
regular_font_path = os.path.join(font_path, "Inter-SemiBold.ttf")
bold_font_path = os.path.join(font_path, "Inter-Bold.ttf")
semibold_font_path = os.path.join(font_path, "Inter-SemiBold.ttf")
max_text_width = 2160 - 300 # full screen width is usable, minus sidebar
draw = ImageDraw.Draw(Image.new('RGB', (0, 0)))
fonts = {
AlertSize.small: [ImageFont.truetype(semibold_font_path, 74)],
AlertSize.mid: [ImageFont.truetype(bold_font_path, 88),
ImageFont.truetype(regular_font_path, 66)],
}
for alert in ALERTS:
if not isinstance(alert, Alert):
alert = alert(self.CP, self.CS, self.sm, metric=False, soft_disable_time=100)
# for full size alerts, both text fields wrap the text,
# so it's unlikely that they would go past the max width
if alert.alert_size in (AlertSize.none, AlertSize.full):
continue
for i, txt in enumerate([alert.alert_text_1, alert.alert_text_2]):
if i >= len(fonts[alert.alert_size]):
break
font = fonts[alert.alert_size][i]
left, _, right, _ = draw.textbbox((0, 0), txt, font)
width = right - left
msg = f"type: {alert.alert_type} msg: {txt}"
self.assertLessEqual(width, max_text_width, msg=msg)
def test_alert_sanity_check(self):
for event_types in EVENTS.values():
for event_type, a in event_types.items():
# TODO: add callback alerts
if not isinstance(a, Alert):
continue
if a.alert_size == AlertSize.none:
self.assertEqual(len(a.alert_text_1), 0)
self.assertEqual(len(a.alert_text_2), 0)
elif a.alert_size == AlertSize.small:
self.assertGreater(len(a.alert_text_1), 0)
self.assertEqual(len(a.alert_text_2), 0)
elif a.alert_size == AlertSize.mid:
self.assertGreater(len(a.alert_text_1), 0)
self.assertGreater(len(a.alert_text_2), 0)
else:
self.assertGreater(len(a.alert_text_1), 0)
self.assertGreaterEqual(a.duration, 0.)
if event_type not in (ET.WARNING, ET.PERMANENT, ET.PRE_ENABLE):
self.assertEqual(a.creation_delay, 0.)
def test_offroad_alerts(self):
params = Params()
for a in self.offroad_alerts:
# set the alert
alert = copy.copy(self.offroad_alerts[a])
set_offroad_alert(a, True)
alert['extra'] = ''
self.assertTrue(json.dumps(alert) == params.get(a, encoding='utf8'))
# then delete it
set_offroad_alert(a, False)
self.assertTrue(params.get(a) is None)
def test_offroad_alerts_extra_text(self):
params = Params()
for i in range(50):
# set the alert
a = random.choice(list(self.offroad_alerts))
alert = self.offroad_alerts[a]
set_offroad_alert(a, True, extra_text="a"*i)
written_alert = json.loads(params.get(a, encoding='utf8'))
self.assertTrue("a"*i == written_alert['extra'])
self.assertTrue(alert["text"] == written_alert['text'])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,156 @@
#!/usr/bin/env python3
import itertools
import numpy as np
import unittest
from parameterized import parameterized_class
from cereal import log
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, V_CRUISE_MIN, V_CRUISE_MAX, V_CRUISE_INITIAL, IMPERIAL_INCREMENT
from cereal import car
from openpilot.common.conversions import Conversions as CV
from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver
ButtonEvent = car.CarState.ButtonEvent
ButtonType = car.CarState.ButtonEvent.Type
def run_cruise_simulation(cruise, e2e, personality, t_end=20.):
man = Maneuver(
'',
duration=t_end,
initial_speed=max(cruise - 1., 0.0),
lead_relevancy=True,
initial_distance_lead=100,
cruise_values=[cruise],
prob_lead_values=[0.0],
breakpoints=[0.],
e2e=e2e,
personality=personality,
)
valid, output = man.evaluate()
assert valid
return output[-1, 3]
@parameterized_class(("e2e", "personality", "speed"), itertools.product(
[True, False], # e2e
log.LongitudinalPersonality.schema.enumerants, # personality
[5,35])) # speed
class TestCruiseSpeed(unittest.TestCase):
def test_cruise_speed(self):
print(f'Testing {self.speed} m/s')
cruise_speed = float(self.speed)
simulation_steady_state = run_cruise_simulation(cruise_speed, self.e2e, self.personality)
self.assertAlmostEqual(simulation_steady_state, cruise_speed, delta=.01, msg=f'Did not reach {self.speed} m/s')
# TODO: test pcmCruise
@parameterized_class(('pcm_cruise',), [(False,)])
class TestVCruiseHelper(unittest.TestCase):
def setUp(self):
self.CP = car.CarParams(pcmCruise=self.pcm_cruise)
self.v_cruise_helper = VCruiseHelper(self.CP)
self.reset_cruise_speed_state()
def reset_cruise_speed_state(self):
# Two resets previous cruise speed
for _ in range(2):
self.v_cruise_helper.update_v_cruise(car.CarState(cruiseState={"available": False}), enabled=False, is_metric=False)
def enable(self, v_ego, experimental_mode):
# Simulates user pressing set with a current speed
self.v_cruise_helper.initialize_v_cruise(car.CarState(vEgo=v_ego), experimental_mode)
def test_adjust_speed(self):
"""
Asserts speed changes on falling edges of buttons.
"""
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
for btn in (ButtonType.accelCruise, ButtonType.decelCruise):
for pressed in (True, False):
CS = car.CarState(cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=btn, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
self.assertEqual(pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_rising_edge_enable(self):
"""
Some car interfaces may enable on rising edge of a button,
ensure we don't adjust speed if enabled changes mid-press.
"""
# NOTE: enabled is always one frame behind the result from button press in controlsd
for enabled, pressed in ((False, False),
(False, True),
(True, False)):
CS = car.CarState(cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=enabled, is_metric=False)
if pressed:
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
# Expected diff on enabling. Speed should not change on falling edge of pressed
self.assertEqual(not pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_resume_in_standstill(self):
"""
Asserts we don't increment set speed if user presses resume/accel to exit cruise standstill.
"""
self.enable(0, False)
for standstill in (True, False):
for pressed in (True, False):
CS = car.CarState(cruiseState={"available": True, "standstill": standstill})
CS.buttonEvents = [ButtonEvent(type=ButtonType.accelCruise, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
# speed should only update if not at standstill and button falling edge
should_equal = standstill or pressed
self.assertEqual(should_equal, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_set_gas_pressed(self):
"""
Asserts pressing set while enabled with gas pressed sets
the speed to the maximum of vEgo and current cruise speed.
"""
for v_ego in np.linspace(0, 100, 101):
self.reset_cruise_speed_state()
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
# first decrement speed, then perform gas pressed logic
expected_v_cruise_kph = self.v_cruise_helper.v_cruise_kph - IMPERIAL_INCREMENT
expected_v_cruise_kph = max(expected_v_cruise_kph, v_ego * CV.MS_TO_KPH) # clip to min of vEgo
expected_v_cruise_kph = float(np.clip(round(expected_v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX))
CS = car.CarState(vEgo=float(v_ego), gasPressed=True, cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=False)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
# TODO: fix skipping first run due to enabled on rising edge exception
if v_ego == 0.0:
continue
self.assertEqual(expected_v_cruise_kph, self.v_cruise_helper.v_cruise_kph)
def test_initialize_v_cruise(self):
"""
Asserts allowed cruise speeds on enabling with SET.
"""
for experimental_mode in (True, False):
for v_ego in np.linspace(0, 100, 101):
self.reset_cruise_speed_state()
self.assertFalse(self.v_cruise_helper.v_cruise_initialized)
self.enable(float(v_ego), experimental_mode)
self.assertTrue(V_CRUISE_INITIAL <= self.v_cruise_helper.v_cruise_kph <= V_CRUISE_MAX)
self.assertTrue(self.v_cruise_helper.v_cruise_initialized)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
import unittest
import itertools
from parameterized import parameterized_class
from cereal import log
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import desired_follow_distance, get_T_FOLLOW
from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver
def run_following_distance_simulation(v_lead, t_end=100.0, e2e=False, personality=0):
man = Maneuver(
'',
duration=t_end,
initial_speed=float(v_lead),
lead_relevancy=True,
initial_distance_lead=100,
speed_lead_values=[v_lead],
breakpoints=[0.],
e2e=e2e,
personality=personality,
)
valid, output = man.evaluate()
assert valid
return output[-1,2] - output[-1,1]
@parameterized_class(("e2e", "personality", "speed"), itertools.product(
[True, False], # e2e
[log.LongitudinalPersonality.relaxed, # personality
log.LongitudinalPersonality.standard,
log.LongitudinalPersonality.aggressive],
[0,10,35])) # speed
class TestFollowingDistance(unittest.TestCase):
def test_following_distance(self):
v_lead = float(self.speed)
simulation_steady_state = run_following_distance_simulation(v_lead, e2e=self.e2e, personality=self.personality)
correct_steady_state = desired_follow_distance(v_lead, v_lead, get_T_FOLLOW(self.personality))
err_ratio = 0.2 if self.e2e else 0.1
self.assertAlmostEqual(simulation_steady_state, correct_steady_state, delta=(err_ratio * correct_steady_state + .5))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,89 @@
import unittest
import numpy as np
from openpilot.selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import LateralMpc
from openpilot.selfdrive.controls.lib.drive_helpers import CAR_ROTATION_RADIUS
from openpilot.selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import N as LAT_MPC_N
def run_mpc(lat_mpc=None, v_ref=30., x_init=0., y_init=0., psi_init=0., curvature_init=0.,
lane_width=3.6, poly_shift=0.):
if lat_mpc is None:
lat_mpc = LateralMpc()
lat_mpc.set_weights(1., .1, 0.0, .05, 800)
y_pts = poly_shift * np.ones(LAT_MPC_N + 1)
heading_pts = np.zeros(LAT_MPC_N + 1)
curv_rate_pts = np.zeros(LAT_MPC_N + 1)
x0 = np.array([x_init, y_init, psi_init, curvature_init])
p = np.column_stack([v_ref * np.ones(LAT_MPC_N + 1),
CAR_ROTATION_RADIUS * np.ones(LAT_MPC_N + 1)])
# converge in no more than 10 iterations
for _ in range(10):
lat_mpc.run(x0, p,
y_pts, heading_pts, curv_rate_pts)
return lat_mpc.x_sol
class TestLateralMpc(unittest.TestCase):
def _assert_null(self, sol, curvature=1e-6):
for i in range(len(sol)):
self.assertAlmostEqual(sol[0,i,1], 0., delta=curvature)
self.assertAlmostEqual(sol[0,i,2], 0., delta=curvature)
self.assertAlmostEqual(sol[0,i,3], 0., delta=curvature)
def _assert_simmetry(self, sol, curvature=1e-6):
for i in range(len(sol)):
self.assertAlmostEqual(sol[0,i,1], -sol[1,i,1], delta=curvature)
self.assertAlmostEqual(sol[0,i,2], -sol[1,i,2], delta=curvature)
self.assertAlmostEqual(sol[0,i,3], -sol[1,i,3], delta=curvature)
self.assertAlmostEqual(sol[0,i,0], sol[1,i,0], delta=curvature)
def test_straight(self):
sol = run_mpc()
self._assert_null(np.array([sol]))
def test_y_symmetry(self):
sol = []
for y_init in [-0.5, 0.5]:
sol.append(run_mpc(y_init=y_init))
self._assert_simmetry(np.array(sol))
def test_poly_symmetry(self):
sol = []
for poly_shift in [-1., 1.]:
sol.append(run_mpc(poly_shift=poly_shift))
self._assert_simmetry(np.array(sol))
def test_curvature_symmetry(self):
sol = []
for curvature_init in [-0.1, 0.1]:
sol.append(run_mpc(curvature_init=curvature_init))
self._assert_simmetry(np.array(sol))
def test_psi_symmetry(self):
sol = []
for psi_init in [-0.1, 0.1]:
sol.append(run_mpc(psi_init=psi_init))
self._assert_simmetry(np.array(sol))
def test_no_overshoot(self):
y_init = 1.
sol = run_mpc(y_init=y_init)
for y in list(sol[:,1]):
self.assertGreaterEqual(y_init, abs(y))
def test_switch_convergence(self):
lat_mpc = LateralMpc()
sol = run_mpc(lat_mpc=lat_mpc, poly_shift=3.0, v_ref=7.0)
right_psi_deg = np.degrees(sol[:,2])
sol = run_mpc(lat_mpc=lat_mpc, poly_shift=-3.0, v_ref=7.0)
left_psi_deg = np.degrees(sol[:,2])
np.testing.assert_almost_equal(right_psi_deg, -left_psi_deg, decimal=3)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
import unittest
import cereal.messaging as messaging
from openpilot.selfdrive.test.process_replay import replay_process_with_name
from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA
class TestLeads(unittest.TestCase):
def test_radar_fault(self):
# if there's no radar-related can traffic, radard should either not respond or respond with an error
# this is tightly coupled with underlying car radar_interface implementation, but it's a good sanity check
def single_iter_pkg():
# single iter package, with meaningless cans and empty carState/modelV2
msgs = []
for _ in range(5):
can = messaging.new_message("can", 1)
cs = messaging.new_message("carState")
msgs.append(can.as_reader())
msgs.append(cs.as_reader())
model = messaging.new_message("modelV2")
msgs.append(model.as_reader())
return msgs
msgs = [m for _ in range(3) for m in single_iter_pkg()]
out = replay_process_with_name("radard", msgs, fingerprint=TOYOTA.COROLLA_TSS2)
states = [m for m in out if m.which() == "radarState"]
failures = [not state.valid and len(state.radarState.radarErrors) for state in states]
self.assertTrue(len(states) == 0 or all(failures))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,120 @@
import os
from parameterized import parameterized
from cereal import log, car
import cereal.messaging as messaging
from openpilot.common.params import Params
from openpilot.selfdrive.boardd.boardd_api_impl import can_list_to_can_capnp
from openpilot.selfdrive.car.fingerprints import _FINGERPRINTS
from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA
from openpilot.selfdrive.car.mazda.values import CAR as MAZDA
from openpilot.selfdrive.controls.lib.events import EVENT_NAME
from openpilot.selfdrive.manager.process_config import managed_processes
EventName = car.CarEvent.EventName
Ecu = car.CarParams.Ecu
COROLLA_FW_VERSIONS = [
(Ecu.engine, 0x7e0, None, b'\x0230ZC2000\x00\x00\x00\x00\x00\x00\x00\x0050212000\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.abs, 0x7b0, None, b'F152602190\x00\x00\x00\x00\x00\x00'),
(Ecu.eps, 0x7a1, None, b'8965B02181\x00\x00\x00\x00\x00\x00'),
(Ecu.fwdRadar, 0x750, 0xf, b'8821F4702100\x00\x00\x00\x00'),
(Ecu.fwdCamera, 0x750, 0x6d, b'8646F0201101\x00\x00\x00\x00'),
(Ecu.dsu, 0x791, None, b'881510201100\x00\x00\x00\x00'),
]
COROLLA_FW_VERSIONS_FUZZY = COROLLA_FW_VERSIONS[:-1] + [(Ecu.dsu, 0x791, None, b'xxxxxx')]
COROLLA_FW_VERSIONS_NO_DSU = COROLLA_FW_VERSIONS[:-1]
CX5_FW_VERSIONS = [
(Ecu.engine, 0x7e0, None, b'PYNF-188K2-F\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.abs, 0x760, None, b'K123-437K2-E\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.eps, 0x730, None, b'KJ01-3210X-G-00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.fwdRadar, 0x764, None, b'K123-67XK2-F\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.fwdCamera, 0x706, None, b'B61L-67XK2-T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.transmission, 0x7e1, None, b'PYNC-21PS1-B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
]
@parameterized.expand([
# TODO: test EventName.startup for release branches
# officially supported car
(EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS, "toyota"),
(EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS, "toyota"),
# dashcamOnly car
(EventName.startupNoControl, MAZDA.CX5, CX5_FW_VERSIONS, "mazda"),
(EventName.startupNoControl, MAZDA.CX5, CX5_FW_VERSIONS, "mazda"),
# unrecognized car with no fw
(EventName.startupNoFw, None, None, ""),
(EventName.startupNoFw, None, None, ""),
# unrecognized car
(EventName.startupNoCar, None, COROLLA_FW_VERSIONS[:1], "toyota"),
(EventName.startupNoCar, None, COROLLA_FW_VERSIONS[:1], "toyota"),
# fuzzy match
(EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS_FUZZY, "toyota"),
(EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS_FUZZY, "toyota"),
])
def test_startup_alert(expected_event, car_model, fw_versions, brand):
controls_sock = messaging.sub_sock("controlsState")
pm = messaging.PubMaster(['can', 'pandaStates'])
params = Params()
params.put_bool("OpenpilotEnabledToggle", True)
# Build capnn version of FW array
if fw_versions is not None:
car_fw = []
cp = car.CarParams.new_message()
for ecu, addr, subaddress, version in fw_versions:
f = car.CarParams.CarFw.new_message()
f.ecu = ecu
f.address = addr
f.fwVersion = version
f.brand = brand
if subaddress is not None:
f.subAddress = subaddress
car_fw.append(f)
cp.carVin = "1" * 17
cp.carFw = car_fw
params.put("CarParamsCache", cp.to_bytes())
else:
os.environ['SKIP_FW_QUERY'] = '1'
managed_processes['controlsd'].start()
assert pm.wait_for_readers_to_update('can', 5)
pm.send('can', can_list_to_can_capnp([[0, 0, b"", 0]]))
assert pm.wait_for_readers_to_update('pandaStates', 5)
msg = messaging.new_message('pandaStates', 1)
msg.pandaStates[0].pandaType = log.PandaState.PandaType.uno
pm.send('pandaStates', msg)
# fingerprint
if (car_model is None) or (fw_versions is not None):
finger = {addr: 1 for addr in range(1, 100)}
else:
finger = _FINGERPRINTS[car_model][0]
msgs = [[addr, 0, b'\x00'*length, 0] for addr, length in finger.items()]
for _ in range(1000):
# controlsd waits for boardd to echo back that it has changed the multiplexing mode
if not params.get_bool("ObdMultiplexingChanged"):
params.put_bool("ObdMultiplexingChanged", True)
pm.send('can', can_list_to_can_capnp(msgs))
assert pm.wait_for_readers_to_update('can', 5, dt=0.001), f"step: {_}"
ctrls = messaging.drain_sock(controls_sock)
if len(ctrls):
event_name = ctrls[0].controlsState.alertType.split("/")[0]
assert EVENT_NAME[expected_event] == event_name, f"expected {EVENT_NAME[expected_event]} for '{car_model}', got {event_name}"
break
else:
raise Exception(f"failed to fingerprint {car_model}")

View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
import unittest
from cereal import car, log
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.car.car_helpers import interfaces
from openpilot.selfdrive.controls.controlsd import Controls, SOFT_DISABLE_TIME
from openpilot.selfdrive.controls.lib.events import Events, ET, Alert, Priority, AlertSize, AlertStatus, VisualAlert, \
AudibleAlert, EVENTS
from openpilot.selfdrive.car.mock.values import CAR as MOCK
State = log.ControlsState.OpenpilotState
# The event types that maintain the current state
MAINTAIN_STATES = {State.enabled: (None,), State.disabled: (None,), State.softDisabling: (ET.SOFT_DISABLE,),
State.preEnabled: (ET.PRE_ENABLE,), State.overriding: (ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)}
ALL_STATES = tuple(State.schema.enumerants.values())
# The event types checked in DISABLED section of state machine
ENABLE_EVENT_TYPES = (ET.ENABLE, ET.PRE_ENABLE, ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)
def make_event(event_types):
event = {}
for ev in event_types:
event[ev] = Alert("", "", AlertStatus.normal, AlertSize.small, Priority.LOW,
VisualAlert.none, AudibleAlert.none, 1.)
EVENTS[0] = event
return 0
class TestStateMachine(unittest.TestCase):
def setUp(self):
CarInterface, CarController, CarState = interfaces[MOCK.MOCK]
CP = CarInterface.get_non_essential_params(MOCK.MOCK)
CI = CarInterface(CP, CarController, CarState)
self.controlsd = Controls(CI=CI)
self.controlsd.events = Events()
self.controlsd.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.CS = car.CarState()
def test_immediate_disable(self):
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.controlsd.events.add(make_event([et, ET.IMMEDIATE_DISABLE]))
self.controlsd.state = state
self.controlsd.state_transition(self.CS)
self.assertEqual(State.disabled, self.controlsd.state)
self.controlsd.events.clear()
def test_user_disable(self):
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.controlsd.events.add(make_event([et, ET.USER_DISABLE]))
self.controlsd.state = state
self.controlsd.state_transition(self.CS)
self.assertEqual(State.disabled, self.controlsd.state)
self.controlsd.events.clear()
def test_soft_disable(self):
for state in ALL_STATES:
if state == State.preEnabled: # preEnabled considers NO_ENTRY instead
continue
for et in MAINTAIN_STATES[state]:
self.controlsd.events.add(make_event([et, ET.SOFT_DISABLE]))
self.controlsd.state = state
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, State.disabled if state == State.disabled else State.softDisabling)
self.controlsd.events.clear()
def test_soft_disable_timer(self):
self.controlsd.state = State.enabled
self.controlsd.events.add(make_event([ET.SOFT_DISABLE]))
self.controlsd.state_transition(self.CS)
for _ in range(int(SOFT_DISABLE_TIME / DT_CTRL)):
self.assertEqual(self.controlsd.state, State.softDisabling)
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, State.disabled)
def test_no_entry(self):
# Make sure noEntry keeps us disabled
for et in ENABLE_EVENT_TYPES:
self.controlsd.events.add(make_event([ET.NO_ENTRY, et]))
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, State.disabled)
self.controlsd.events.clear()
def test_no_entry_pre_enable(self):
# preEnabled with noEntry event
self.controlsd.state = State.preEnabled
self.controlsd.events.add(make_event([ET.NO_ENTRY, ET.PRE_ENABLE]))
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, State.preEnabled)
def test_maintain_states(self):
# Given current state's event type, we should maintain state
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.controlsd.state = state
self.controlsd.events.add(make_event([et]))
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, state)
self.controlsd.events.clear()
if __name__ == "__main__":
unittest.main()