From 2fbe9dbea1fd74e16fb73a804e313be94dfe87a2 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 27 Apr 2024 13:44:34 -0500 Subject: [PATCH] wip --- selfdrive/controls/.gitignore | 2 + selfdrive/controls/controlsd.py | 743 ++++++++---------- selfdrive/controls/lib/alertmanager.py | 11 +- selfdrive/controls/lib/desire_helper.py | 20 +- selfdrive/controls/lib/drive_helpers.py | 29 +- selfdrive/controls/lib/events.py | 45 +- selfdrive/controls/lib/latcontrol_torque.py | 26 +- selfdrive/controls/lib/longcontrol.py | 23 + .../controls/lib/longitudinal_planner.py | 161 +++- selfdrive/controls/lib/tests/__init__.py | 0 .../controls/lib/tests/test_alertmanager.py | 45 ++ .../controls/lib/tests/test_latcontrol.py | 43 + .../controls/lib/tests/test_vehicle_model.py | 70 ++ selfdrive/controls/lib/vehicle_model.py | 3 +- selfdrive/controls/plannerd.py | 15 +- selfdrive/controls/radard.py | 14 +- selfdrive/controls/radardless.py | 93 +++ selfdrive/controls/tests/__init__.py | 0 selfdrive/controls/tests/test_alerts.py | 135 ++++ selfdrive/controls/tests/test_cruise_speed.py | 156 ++++ .../controls/tests/test_following_distance.py | 45 ++ selfdrive/controls/tests/test_lateral_mpc.py | 89 +++ selfdrive/controls/tests/test_leads.py | 36 + selfdrive/controls/tests/test_startup.py | 120 +++ .../controls/tests/test_state_machine.py | 109 +++ 25 files changed, 1501 insertions(+), 532 deletions(-) create mode 100644 selfdrive/controls/.gitignore mode change 100755 => 100644 selfdrive/controls/lib/longitudinal_planner.py create mode 100644 selfdrive/controls/lib/tests/__init__.py create mode 100644 selfdrive/controls/lib/tests/test_alertmanager.py create mode 100644 selfdrive/controls/lib/tests/test_latcontrol.py create mode 100644 selfdrive/controls/lib/tests/test_vehicle_model.py create mode 100644 selfdrive/controls/radardless.py create mode 100644 selfdrive/controls/tests/__init__.py create mode 100644 selfdrive/controls/tests/test_alerts.py create mode 100644 selfdrive/controls/tests/test_cruise_speed.py create mode 100644 selfdrive/controls/tests/test_following_distance.py create mode 100644 selfdrive/controls/tests/test_lateral_mpc.py create mode 100644 selfdrive/controls/tests/test_leads.py create mode 100644 selfdrive/controls/tests/test_startup.py create mode 100644 selfdrive/controls/tests/test_state_machine.py diff --git a/selfdrive/controls/.gitignore b/selfdrive/controls/.gitignore new file mode 100644 index 0000000..22a371d --- /dev/null +++ b/selfdrive/controls/.gitignore @@ -0,0 +1,2 @@ +calibration_param +traces diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index 18db54e..b3adedb 100644 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -9,10 +9,9 @@ from typing import SupportsFloat import cereal.messaging as messaging import openpilot.selfdrive.sentry as sentry -from cereal import car, log, custom +from cereal import car, custom, log from cereal.visionipc import VisionIpcClient, VisionStreamType -from panda import ALTERNATIVE_EXPERIENCE from openpilot.common.conversions import Conversions as CV from openpilot.common.numpy_fast import clip @@ -20,9 +19,8 @@ from openpilot.common.params import Params from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL from openpilot.common.swaglog import cloudlog -from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp -from openpilot.selfdrive.car.car_helpers import get_car, get_startup_event, get_one_can -from openpilot.selfdrive.car.interfaces import CarInterfaceBase +from openpilot.selfdrive.car.car_helpers import get_startup_event +from openpilot.selfdrive.car.card import CarD from openpilot.selfdrive.controls.lib.alertmanager import AlertManager, set_offroad_alert from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, clip_curvature from openpilot.selfdrive.controls.lib.events import Events, ET @@ -36,9 +34,10 @@ from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel from openpilot.system.hardware import HARDWARE from openpilot.system.version import get_short_branch -from openpilot.selfdrive.frogpilot.functions.frogpilot_functions import CRUISING_SPEED +from openpilot.selfdrive.frogpilot.controls.lib.frogpilot_functions import CRUISING_SPEED, PROBABILITY, MovingAverageCalculator -from openpilot.selfdrive.frogpilot.functions.speed_limit_controller import SpeedLimitController +from openpilot.selfdrive.frogpilot.controls.lib.model_manager import RADARLESS_MODELS +from openpilot.selfdrive.frogpilot.controls.lib.speed_limit_controller import SpeedLimitController SOFT_DISABLE_TIME = 3 # seconds LDW_MIN_SPEED = 31 * CV.MPH_TO_MS @@ -58,8 +57,9 @@ LaneChangeState = log.LaneChangeState LaneChangeDirection = log.LaneChangeDirection EventName = car.CarEvent.EventName ButtonType = car.CarState.ButtonEvent.Type -SafetyModel = car.CarParams.SafetyModel GearShifter = car.CarState.GearShifter +SafetyModel = car.CarParams.SafetyModel +FrogPilotButtonType = custom.FrogPilotCarState.ButtonEvent.Type IGNORED_SAFETY_MODES = (SafetyModel.silent, SafetyModel.noOutput) CSID_MAP = {"1": EventName.roadCameraError, "2": EventName.wideRoadCameraError, "0": EventName.driverCameraError} @@ -68,100 +68,22 @@ ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding) ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES) -class CarD: - CI: CarInterfaceBase - CS: car.CarState - - def __init__(self, CI=None): - self.can_sock = messaging.sub_sock('can', timeout=20) - self.sm = messaging.SubMaster(['pandaStates']) - self.pm = messaging.PubMaster(['sendcan', 'carState', 'carParams']) - - self.can_rcv_timeout_counter = 0 # conseuctive timeout count - self.can_rcv_cum_timeout_counter = 0 # cumulative timeout count - - self.params = Params() - - if CI is None: - # wait for one pandaState and one CAN packet - print("Waiting for CAN messages...") - get_one_can(self.can_sock) - - num_pandas = len(messaging.recv_one_retry(self.sm.sock['pandaStates']).pandaStates) - experimental_long_allowed = self.params.get_bool("ExperimentalLongitudinalEnabled") - self.CI, self.CP = get_car(self.can_sock, self.pm.sock['sendcan'], experimental_long_allowed, num_pandas) - else: - self.CI, self.CP = CI, CI.CP - - def initialize(self): - """Initialize CarInterface, once controls are ready""" - self.CI.init(self.CP, self.can_sock, self.pm.sock['sendcan']) - - def state_update(self, CC: car.CarControl, frogpilot_variables): - """carState update loop, driven by can""" - - # TODO: This should not depend on carControl - - # Update carState from CAN - can_strs = messaging.drain_sock_raw(self.can_sock, wait_for_one=True) - self.CS = self.CI.update(CC, can_strs, frogpilot_variables) - - self.sm.update(0) - - can_rcv_valid = len(can_strs) > 0 - - # Check for CAN timeout - if not can_rcv_valid: - self.can_rcv_timeout_counter += 1 - self.can_rcv_cum_timeout_counter += 1 - else: - self.can_rcv_timeout_counter = 0 - - self.can_rcv_timeout = self.can_rcv_timeout_counter >= 5 - - if can_rcv_valid and REPLAY: - self.can_log_mono_time = messaging.log_from_bytes(can_strs[0]).logMonoTime - - return self.CS - - def state_publish(self, car_events): - """carState and carParams publish loop""" - - # TODO: carState should be independent of the event loop - - # carState - cs_send = messaging.new_message('carState') - cs_send.valid = self.CS.canValid - cs_send.carState = self.CS - cs_send.carState.events = car_events - self.pm.send('carState', cs_send) - - # carParams - logged every 50 seconds (> 1 per segment) - if (self.sm.frame % int(50. / DT_CTRL) == 0): - cp_send = messaging.new_message('carParams') - cp_send.valid = True - cp_send.carParams = self.CP - self.pm.send('carParams', cp_send) - - def controls_update(self, CC: car.CarControl, frogpilot_variables): - """control update loop, driven by carControl""" - - # send car controls over can - now_nanos = self.can_log_mono_time if REPLAY else int(time.monotonic() * 1e9) - actuators_output, can_sends = self.CI.apply(CC, now_nanos, frogpilot_variables) - self.pm.send('sendcan', can_list_to_can_capnp(can_sends, msgtype='sendcan', valid=self.CS.canValid)) - - return actuators_output - - class Controls: def __init__(self, CI=None): self.card = CarD(CI) - self.CP = self.card.CP + self.params = Params() + self.params_memory = Params("/dev/shm/params") + self.params_storage = Params("/persist/params") + + self.radarless_model = self.params.get("Model", encoding='utf-8') in RADARLESS_MODELS + + with car.CarParams.from_bytes(self.params.get("CarParams", block=True)) as msg: + # TODO: this shouldn't need to be a builder + self.CP = msg.as_builder() + self.CI = self.card.CI - config_realtime_process(4, Priority.CTRL_HIGH) # Ensure the current branch is cached, otherwise the first iteration of controlsd lags self.branch = get_short_branch() @@ -174,35 +96,13 @@ class Controls: self.log_sock = messaging.sub_sock('androidLog') - # FrogPilot variables - self.params = Params() - self.params_memory = Params("/dev/shm/params") - self.params_storage = Params("/persist/comma/params") - - self.frogpilot_variables = SimpleNamespace() - - self.drive_added = False - self.driving_gear = False - self.fcw_random_event_triggered = False - self.holiday_theme_alerted = False - self.previously_enabled = False - self.openpilot_crashed = False - self.previously_enabled = False - self.random_event_triggered = False - self.stopped_for_light_previously = False - - self.drive_distance = 0 - self.drive_time = 0 - self.max_acceleration = 0 - self.previous_lead_distance = 0 - self.previous_speed_limit = SpeedLimitController.desired_speed_limit - self.random_event_timer = 0 - ignore = self.sensor_packets + ['testJoystick'] if SIMULATION: ignore += ['driverCameraState', 'managerState'] + if self.radarless_model: + ignore += ['radarState'] self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration', - 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman', + 'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman', 'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters', 'testJoystick', 'frogpilotPlan'] + self.camera_packets + self.sensor_packets, ignore_alive=ignore, ignore_avg_freq=ignore+['radarState', 'testJoystick'], ignore_valid=['testJoystick', ], @@ -210,53 +110,16 @@ class Controls: self.joystick_mode = self.params.get_bool("JoystickDebugMode") - # set alternative experiences from parameters - self.disengage_on_accelerator = self.params.get_bool("DisengageOnAccelerator") - self.CP.alternativeExperience = 0 - if not self.disengage_on_accelerator: - self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS - - # Set "Always On Lateral" conditions - self.always_on_lateral = self.params.get_bool("AlwaysOnLateral") - self.always_on_lateral_main = self.params.get_bool("AlwaysOnLateralMain") - if self.always_on_lateral: - self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.ALWAYS_ON_LATERAL - if self.disengage_on_accelerator: - self.disengage_on_accelerator = False - self.params.put_bool("DisengageOnAccelerator", False) - - self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX - # read params + self.disengage_on_accelerator = self.params.get_bool("DisengageOnAccelerator") self.is_metric = self.params.get_bool("IsMetric") self.is_ldw_enabled = self.params.get_bool("IsLdwEnabled") - openpilot_enabled_toggle = self.params.get_bool("OpenpilotEnabledToggle") - - self.update_frogpilot_params() # detect sound card presence and ensure successful init sounds_available = HARDWARE.get_sound_card_online() car_recognized = self.CP.carName != 'mock' - controller_available = self.CI.CC is not None and openpilot_enabled_toggle and not self.CP.dashcamOnly - self.CP.passive = not car_recognized or not controller_available or self.CP.dashcamOnly - if self.CP.passive: - safety_config = car.CarParams.SafetyConfig.new_message() - safety_config.safetyModel = car.CarParams.SafetyModel.noOutput - self.CP.safetyConfigs = [safety_config] - - # Write previous route's CarParams - prev_cp = self.params.get("CarParamsPersistent") - if prev_cp is not None: - self.params.put("CarParamsPrevRoute", prev_cp) - - # Write CarParams for radard - cp_bytes = self.CP.to_bytes() - self.params.put("CarParams", cp_bytes) - self.params.put_nonblocking("CarParamsCache", cp_bytes) - self.params.put_nonblocking("CarParamsPersistent", cp_bytes) - # cleanup old params if not self.CP.experimentalLongitudinalAvailable: self.params.remove("ExperimentalLongitudinalEnabled") @@ -295,16 +158,16 @@ class Controls: self.current_alert_types = [ET.PERMANENT] self.logged_comm_issue = None self.not_running_prev = None - self.last_actuators = car.CarControl.Actuators.new_message() self.steer_limited = False self.desired_curvature = 0.0 self.experimental_mode = False + self.personality = self.read_personality_param() self.v_cruise_helper = VCruiseHelper(self.CP) self.recalibrating_seen = False self.can_log_mono_time = 0 - self.startup_event = get_startup_event(car_recognized, controller_available, len(self.CP.carFw) > 0) + self.startup_event = get_startup_event(car_recognized, not self.CP.passive, len(self.CP.carFw) > 0) if not sounds_available: self.events.add(EventName.soundsUnavailable, static=True) @@ -320,6 +183,36 @@ class Controls: # controlsd is driven by can recv, expected at 100Hz self.rk = Ratekeeper(100, print_delay_threshold=None) + # FrogPilot variables + self.frogpilot_variables = SimpleNamespace() + + self.block_user = self.branch == "FrogPilot-Development" and not self.params_storage.get_bool("FrogsGoMoo") + + self.always_on_lateral = self.params.get_bool("AlwaysOnLateral") + self.always_on_lateral_main = self.always_on_lateral and self.params.get_bool("AlwaysOnLateralMain") + + self.drive_added = False + self.fcw_random_event_triggered = False + self.holiday_theme_alerted = False + self.onroad_distance_pressed = False + self.openpilot_crashed_triggered = False + self.previously_enabled = False + self.random_event_triggered = False + self.speed_check = False + + self.crashed_timer = 0 + self.drive_distance = 0 + self.drive_time = 0 + self.max_acceleration = 0 + self.previous_lead_distance = 0 + self.previous_speed_limit = 0 + self.random_event_timer = 0 + self.speed_limit_timer = 0 + + self.green_light_mac = MovingAverageCalculator() + + self.update_frogpilot_params() + def set_initial_state(self): if REPLAY: controls_state = Params().get("ReplayControlsState") @@ -335,20 +228,6 @@ class Controls: self.events.clear() - frogpilot_plan = self.sm['frogpilotPlan'] - - # Show crash log event if openpilot crashed - if os.path.isfile(os.path.join(sentry.CRASHES_DIR, 'error.txt')): - self.events.add(EventName.openpilotCrashed) - if self.random_events and not self.openpilot_crashed: - self.events.add(EventName.openpilotCrashedRandomEvents) - self.openpilot_crashed = True - - # Show holiday related event to indicate which holiday is active - if self.sm.frame >= 1000 and self.holiday_themes and self.params_memory.get_int("CurrentHolidayTheme") != 0 and not self.holiday_theme_alerted: - self.events.add(EventName.holidayActive) - self.holiday_theme_alerted = True - # Add joystick event, static on cars, dynamic on nonCars if self.joystick_mode: self.events.add(EventName.joystickDebug) @@ -368,10 +247,6 @@ class Controls: if self.CP.passive: return - # show alert to indicate whether NNFF is loaded - if self.sm.frame == 550 and self.CP.lateralTuning.which() == 'torque' and self.CI.has_lateral_torque_nn: - self.events.add(EventName.torqueNNLoad) - # Block resume if cruise never previously enabled resume_pressed = any(be.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for be in CS.buttonEvents) if not self.CP.pcmCruise and not self.v_cruise_helper.v_cruise_initialized and resume_pressed: @@ -398,7 +273,8 @@ class Controls: # Create events for temperature, disk space, and memory if self.sm['deviceState'].thermalStatus >= ThermalStatus.red: - self.events.add(EventName.overheat) + if not self.increase_thermal_limits or self.sm['deviceState'].thermalStatus == ThermalStatus.danger: + self.events.add(EventName.overheat) if self.sm['deviceState'].freeSpacePercent < 7 and not SIMULATION: # under 7% of space free no enable allowed self.events.add(EventName.outOfSpace) @@ -435,33 +311,27 @@ class Controls: # Handle lane change if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange: direction = self.sm['modelV2'].meta.laneChangeDirection - desired_lane = frogpilot_plan.laneWidthLeft if direction == LaneChangeDirection.left else frogpilot_plan.laneWidthRight - lane_available = desired_lane >= self.lane_detection_width - if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \ (CS.rightBlindspot and direction == LaneChangeDirection.right): if self.loud_blindspot_alert: self.events.add(EventName.laneChangeBlockedLoud) else: self.events.add(EventName.laneChangeBlocked) - elif not lane_available: - self.events.add(EventName.noLaneAvailable) else: if direction == LaneChangeDirection.left: - self.events.add(EventName.preLaneChangeLeft) + if self.sm['frogpilotPlan'].laneWidthLeft >= self.lane_detection_width: + self.events.add(EventName.preLaneChangeLeft) + else: + self.events.add(EventName.noLaneAvailable) else: - self.events.add(EventName.preLaneChangeRight) + if self.sm['frogpilotPlan'].laneWidthRight >= self.lane_detection_width: + self.events.add(EventName.preLaneChangeRight) + else: + self.events.add(EventName.noLaneAvailable) elif self.sm['modelV2'].meta.laneChangeState in (LaneChangeState.laneChangeStarting, LaneChangeState.laneChangeFinishing): self.events.add(EventName.laneChange) - # Handle turning - if not CS.standstill: - if self.sm['modelV2'].meta.turnDirection == Desire.turnLeft: - self.events.add(EventName.turningLeft) - elif self.sm['modelV2'].meta.turnDirection == Desire.turnRight: - self.events.add(EventName.turningRight) - for i, pandaState in enumerate(self.sm['pandaStates']): # All pandas must match the list of safetyConfigs, and if outside this list, must be silent or noOutput if i < len(self.CP.safetyConfigs): @@ -497,8 +367,9 @@ class Controls: self.events.add(EventName.cameraFrameRate) if not REPLAY and self.rk.lagging: self.events.add(EventName.controlsdLagging) - if len(self.sm['radarState'].radarErrors) or (not self.rk.lagging and not self.sm.all_checks(['radarState'])): - self.events.add(EventName.radarFault) + if not self.radarless_model: + if len(self.sm['radarState'].radarErrors) or (not self.rk.lagging and not self.sm.all_checks(['radarState'])): + self.events.add(EventName.radarFault) if not self.sm.valid['pandaStates']: self.events.add(EventName.usbError) if CS.canTimeout: @@ -560,7 +431,6 @@ class Controls: elif self.fcw_random_event_triggered and self.random_events: self.events.add(EventName.yourFrogTriedToKillMe) self.fcw_random_event_triggered = False - self.random_event_triggered = True for m in messaging.drain_sock(self.log_sock, wait_for_one=False): try: @@ -585,147 +455,12 @@ class Controls: if self.sm['modelV2'].frameDropPerc > 20: self.events.add(EventName.modeldLagging) - # Store the total distance traveled - self.drive_distance += CS.vEgo * DT_CTRL - self.drive_time += DT_CTRL - - # Store the current drive's data after a minute when at a standstill - Need to do this live for comma powerless users - if self.drive_time > 60 and CS.standstill: - current_total_distance = self.params.get_float("FrogPilotKilometers") - distance_to_add = self.drive_distance / 1000 - new_total_distance = current_total_distance + distance_to_add - - self.params.put_float("FrogPilotKilometers", new_total_distance) - self.params_storage.put_float("FrogPilotKilometers", new_total_distance) - - self.drive_distance = 0 - - current_total_time = self.params.get_float("FrogPilotMinutes") - time_to_add = self.drive_time / 60 - new_total_time = current_total_time + time_to_add - - self.params.put_float("FrogPilotMinutes", new_total_time) - self.params_storage.put_float("FrogPilotMinutes", new_total_time) - - self.drive_time = 0 - - # Only count the drive if it lasted longer than 5 minutes - if self.sm.frame * DT_CTRL > 60 * 5 and not self.drive_added: - new_total_drives = self.params.get_int("FrogPilotDrives") + 1 - - self.params.put_int("FrogPilotDrives", new_total_drives) - self.params_storage.put_int("FrogPilotDrives", new_total_drives) - - self.drive_added = True - - # Acceleration Random Event alerts - if self.random_events: - acceleration = CS.aEgo - - if not CS.gasPressed: - self.max_acceleration = max(acceleration, self.max_acceleration) - else: - self.max_acceleration = 0 - - if 3.5 > self.max_acceleration >= 3.0 and acceleration < 1.5: - self.events.add(EventName.accel30) - self.params_memory.put_int("CurrentRandomEvent", 2) - self.random_event_triggered = True - self.max_acceleration = 0 - elif self.max_acceleration >= 3.5 and acceleration < 1.5: - self.events.add(EventName.accel35) - self.params_memory.put_int("CurrentRandomEvent", 3) - self.random_event_triggered = True - self.max_acceleration = 0 - - # Green light alert - if self.green_light_alert: - stopped_for_light = frogpilot_plan.redLight and CS.standstill - green_light = not stopped_for_light and self.stopped_for_light_previously - self.stopped_for_light_previously = stopped_for_light - - self.previously_enabled |= (self.enabled or self.FPCC.alwaysOnLateral) and CS.vEgo > CRUISING_SPEED - self.previously_enabled &= self.driving_gear - - green_light &= self.previously_enabled - green_light &= not CS.gasPressed - green_light &= not self.sm['longitudinalPlan'].hasLead - - if green_light: - self.events.add(EventName.greenLight) - - # Lead departing alert - if self.lead_departing_alert and self.sm.frame % 50 == 0: - lead = self.sm['radarState'].leadOne - lead_distance = lead.dRel - lead_departing = lead_distance - self.previous_lead_distance > 0.5 and self.previous_lead_distance != 0 and CS.standstill - self.previous_lead_distance = lead_distance - - lead_departing &= not CS.gasPressed - lead_departing &= lead.vLead > 1 - lead_departing &= self.driving_gear - - if lead_departing: - self.events.add(EventName.leadDeparting) - - # Speed limit changed alert - if self.speed_limit_alert or self.speed_limit_confirmation: - current_speed_limit = frogpilot_plan.slcSpeedLimit - desired_speed_limit = SpeedLimitController.desired_speed_limit - - speed_limit_changed = desired_speed_limit != self.previous_speed_limit and abs(current_speed_limit - desired_speed_limit) > 1 - - speed_limit_changed_lower = speed_limit_changed and self.previous_speed_limit > desired_speed_limit - speed_limit_changed_higher = speed_limit_changed and self.previous_speed_limit < desired_speed_limit - - self.previous_speed_limit = desired_speed_limit - - if speed_limit_changed_lower: - if self.speed_limit_confirmation_lower: - self.FPCC.speedLimitChanged = True - else: - self.params_memory.put_bool("SLCConfirmed", True) - if speed_limit_changed_higher: - if self.speed_limit_confirmation_higher: - self.FPCC.speedLimitChanged = True - else: - self.params_memory.put_bool("SLCConfirmed", True) - - if self.params_memory.get_bool("SLCConfirmedPressed") or abs(current_speed_limit - desired_speed_limit) < 1 or not self.speed_limit_confirmation: - self.FPCC.speedLimitChanged = False - self.params_memory.put_bool("SLCConfirmedPressed", False) - - if speed_limit_changed and self.speed_limit_alert: - self.events.add(EventName.speedLimitChanged) - - # Cancel the confirmation message after 10 seconds - if self.FPCC.speedLimitChanged: - self.speed_limit_timer += 1 - if self.speed_limit_timer * DT_CTRL >= 10: - self.FPCC.speedLimitChanged = False - self.speed_limit_timer = 0 - else: - self.speed_limit_timer = 0 - else: - self.FPCC.speedLimitChanged = False - - # vCruise set to 69 Random Event alert - if self.random_events: - conversion = 1 if self.is_metric else CV.KPH_TO_MPH - v_cruise = self.v_cruise_helper.v_cruise_cluster_kph if self.v_cruise_helper.v_cruise_cluster_kph != 0.0 else self.v_cruise_helper.v_cruise_kph - v_cruise *= conversion - - if 70 > v_cruise >= 69: - if not self.vCruise69_alert_played: - self.events.add(EventName.vCruise69) - self.vCruise69_alert_played = True - else: - self.vCruise69_alert_played = False + self.update_frogpilot_events(CS) def data_sample(self): """Receive data from sockets and update carState""" - CS = self.card.state_update(self.CC, self.frogpilot_variables) + CS = self.card.state_update(self.frogpilot_variables) self.sm.update(0) @@ -857,6 +592,9 @@ class Controls: if self.active: self.current_alert_types.append(ET.WARNING) + if self.FPCC.alwaysOnLateral: + self.current_alert_types.append(ET.WARNING) + def state_control(self, CS): """Given the state, this function returns a CarControl packet""" @@ -879,41 +617,11 @@ class Controls: CC = car.CarControl.new_message() CC.enabled = self.enabled - # FrogPilot functions - frogpilot_plan = self.sm['frogpilotPlan'] - - # Reset the Random Event flag after 5 seconds - if self.random_event_triggered: - self.random_event_timer += 1 - if self.random_event_timer * DT_CTRL >= 4: - self.random_event_triggered = False - self.random_event_timer = 0 - self.params_memory.remove("CurrentRandomEvent") - - # Update Experimental Mode - if self.frogpilot_variables.conditional_experimental_mode: - self.experimental_mode = frogpilot_plan.conditionalExperimental - - # Gear Check - self.driving_gear = CS.gearShifter not in (GearShifter.neutral, GearShifter.park, GearShifter.reverse, GearShifter.unknown) - - signal_check = not ((CS.leftBlinker or CS.rightBlinker) and CS.vEgo < self.pause_lateral_on_signal and not CS.standstill) - - # Always on lateral - self.FPCC.alwaysOnLateral |= CS.cruiseState.enabled or self.always_on_lateral_main - self.FPCC.alwaysOnLateral &= self.always_on_lateral - self.FPCC.alwaysOnLateral &= CS.cruiseState.available - self.FPCC.alwaysOnLateral &= self.driving_gear - self.FPCC.alwaysOnLateral &= not self.openpilot_crashed - - if self.FPCC.alwaysOnLateral: - self.current_alert_types.append(ET.WARNING) - # Check which actuators can be enabled standstill = CS.vEgo <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill - CC.latActive = (self.active or self.FPCC.alwaysOnLateral) and signal_check and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \ - (not standstill or self.joystick_mode) and not self.openpilot_crashed - CC.longActive = self.enabled and not self.events.contains(ET.OVERRIDE_LONGITUDINAL) and self.CP.openpilotLongitudinalControl and not self.openpilot_crashed + CC.latActive = (self.active or self.FPCC.alwaysOnLateral) and self.speed_check and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \ + (not standstill or self.joystick_mode) + CC.longActive = self.enabled and not self.events.contains(ET.OVERRIDE_LONGITUDINAL) and self.CP.openpilotLongitudinalControl actuators = CC.actuators actuators.longControlState = self.LoC.long_control_state @@ -939,6 +647,9 @@ class Controls: t_since_plan = (self.sm.frame - self.sm.recv_frame['longitudinalPlan']) * DT_CTRL actuators.accel = self.LoC.update(CC.longActive, CS, long_plan, pid_accel_limits, t_since_plan) + if len(long_plan.speeds): + actuators.speed = long_plan.speeds[-1] + # Steering PID loop and lateral MPC self.desired_curvature = clip_curvature(CS.vEgo, self.desired_curvature, model_v2.action.desiredCurvature) actuators.curvature = self.desired_curvature @@ -979,14 +690,14 @@ class Controls: undershooting = abs(lac_log.desiredLateralAccel) / abs(1e-3 + lac_log.actualLateralAccel) > 1.2 turning = abs(lac_log.desiredLateralAccel) > 1.0 good_speed = CS.vEgo > 5 - max_torque = abs(self.last_actuators.steer) > 0.99 + max_torque = abs(actuators.steer) > 0.99 if undershooting and turning and good_speed and max_torque and not self.random_event_triggered: if self.sm.frame % 10000 == 0 and self.random_events: lac_log.active and self.events.add(EventName.firefoxSteerSaturated) self.params_memory.put_int("CurrentRandomEvent", 1) self.random_event_triggered = True else: - lac_log.active and self.events.add(EventName.frogSteerSaturated if self.goat_scream and self.params_memory.get_int("CurrentHolidayTheme") == 0 else EventName.steerSaturated) + lac_log.active and self.events.add(EventName.goatSteerSaturated if self.goat_scream else EventName.steerSaturated) elif lac_log.saturated: # TODO probably should not use dpath_points but curvature dpath_points = model_v2.position.y @@ -1014,11 +725,21 @@ class Controls: cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}") setattr(actuators, p, 0.0) + # decrement personality on distance button press + if self.CP.openpilotLongitudinalControl: + if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents) or self.onroad_distance_pressed: + if not (self.params_memory.get_bool("DistanceLongPressed") or self.params_memory.get_bool("OnroadDistanceButtonPressed")): + self.personality = (self.personality - 1) % 3 + self.params.put_nonblocking('LongitudinalPersonality', str(self.personality)) + self.onroad_distance_pressed = self.params_memory.get_bool("OnroadDistanceButtonPressed") + return CC, lac_log def publish_logs(self, CS, start_time, CC, lac_log): """Send actuators and hud commands to the car, send controlsstate and MPC logging""" + CO = self.sm['carOutput'] + # Orientation and angle rates can be useful for carcontroller # Only calibrated (car) frame is relevant for the carcontroller orientation_value = list(self.sm['liveLocationKalman'].calibratedOrientationNED.value) @@ -1042,6 +763,7 @@ class Controls: hudControl.speedVisible = self.enabled hudControl.lanesVisible = self.enabled hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead + hudControl.leadDistanceBars = self.personality + 1 hudControl.rightLaneVisible = True hudControl.leftLaneVisible = True @@ -1081,13 +803,12 @@ class Controls: hudControl.visualAlert = current_alert.visual_alert if not self.CP.passive and self.initialized: - self.last_actuators = self.card.controls_update(CC, self.frogpilot_variables) - CC.actuatorsOutput = self.last_actuators + self.card.controls_update(CC, self.frogpilot_variables) if self.CP.steerControlType == car.CarParams.SteerControlType.angle: - self.steer_limited = abs(CC.actuators.steeringAngleDeg - CC.actuatorsOutput.steeringAngleDeg) > \ + self.steer_limited = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \ STEER_ANGLE_SATURATION_THRESHOLD else: - self.steer_limited = abs(CC.actuators.steer - CC.actuatorsOutput.steer) > 1e-2 + self.steer_limited = abs(CC.actuators.steer - CO.actuatorsOutput.steer) > 1e-2 force_decel = (self.sm['driverMonitoringState'].awarenessStatus < 0.) or \ (self.state == State.softDisabling) @@ -1131,6 +852,7 @@ class Controls: controlsState.forceDecel = bool(force_decel) controlsState.canErrorCounter = self.card.can_rcv_cum_timeout_counter controlsState.experimentalMode = self.experimental_mode + controlsState.personality = self.personality lat_tuning = self.CP.lateralTuning.which() if self.joystick_mode: @@ -1144,15 +866,11 @@ class Controls: self.pm.send('controlsState', dat) - car_events = self.events.to_msg() - - self.card.state_publish(car_events) - # onroadEvents - logged every second or on change if (self.sm.frame % int(1. / DT_CTRL) == 0) or (self.events.names != self.events_prev): ce_send = messaging.new_message('onroadEvents', len(self.events)) ce_send.valid = True - ce_send.onroadEvents = car_events + ce_send.onroadEvents = self.events.to_msg() self.pm.send('onroadEvents', ce_send) self.events_prev = self.events.names.copy() @@ -1165,12 +883,6 @@ class Controls: # copy CarControl to pass to CarInterface on the next iteration self.CC = CC - # Publish FrogPilot variables - fpcs_send = messaging.new_message('frogpilotCarControl') - fpcs_send.valid = CS.canValid - fpcs_send.frogpilotCarControl = self.FPCC - self.pm.send('frogpilotCarControl', fpcs_send) - def step(self): start_time = time.monotonic() @@ -1193,22 +905,25 @@ class Controls: self.CS_prev = CS + # Update FrogPilot variables + self.update_frogpilot_variables(CS) + + def read_personality_param(self): + try: + return int(self.params.get('LongitudinalPersonality')) + except (ValueError, TypeError): + return log.LongitudinalPersonality.standard + def params_thread(self, evt): while not evt.is_set(): self.is_metric = self.params.get_bool("IsMetric") - if self.CP.openpilotLongitudinalControl: - if not self.frogpilot_variables.conditional_experimental_mode: - self.experimental_mode = self.params.get_bool("ExperimentalMode") or SpeedLimitController.experimental_mode - else: - self.experimental_mode = False + if self.CP.openpilotLongitudinalControl and not self.frogpilot_variables.conditional_experimental_mode: + self.experimental_mode = self.params.get_bool("ExperimentalMode") or self.speed_limit_controller and SpeedLimitController.experimental_mode + self.personality = self.read_personality_param() if self.CP.notCar: self.joystick_mode = self.params.get_bool("JoystickDebugMode") time.sleep(0.1) - # Update FrogPilot parameters - if self.params_memory.get_bool("FrogPilotTogglesUpdated"): - self.update_frogpilot_params() - def controlsd_thread(self): e = threading.Event() t = threading.Thread(target=self.params_thread, args=(e, )) @@ -1221,12 +936,213 @@ class Controls: e.set() t.join() + def update_frogpilot_events(self, CS): + if self.block_user: + self.events.add(EventName.blockUser) + return + + if self.random_events: + acceleration = CS.aEgo + + if not CS.gasPressed: + self.max_acceleration = max(acceleration, self.max_acceleration) + else: + self.max_acceleration = 0 + + if 3.5 > self.max_acceleration >= 3.0 and acceleration < 1.5: + self.events.add(EventName.accel30) + self.params_memory.put_int("CurrentRandomEvent", 2) + self.random_event_triggered = True + self.max_acceleration = 0 + + elif 4.0 > self.max_acceleration >= 3.5 and acceleration < 1.5: + self.events.add(EventName.accel35) + self.params_memory.put_int("CurrentRandomEvent", 3) + self.random_event_triggered = True + self.max_acceleration = 0 + + elif self.max_acceleration >= 4.0 and acceleration < 1.5: + self.events.add(EventName.accel40) + self.params_memory.put_int("CurrentRandomEvent", 4) + self.random_event_triggered = True + self.max_acceleration = 0 + + if self.green_light_alert: + green_light = not self.sm['frogpilotPlan'].redLight + green_light &= not CS.gasPressed + green_light &= not self.sm['longitudinalPlan'].hasLead + green_light &= self.previously_enabled + green_light &= CS.standstill + + self.green_light_mac.add_data(green_light) + if self.green_light_mac.get_moving_average() >= PROBABILITY: + self.events.add(EventName.greenLight) + + if self.sm.frame >= 1000 and self.holiday_themes and self.params_memory.get_int("CurrentHolidayTheme") != 0 and not self.holiday_theme_alerted: + self.events.add(EventName.holidayActive) + self.holiday_theme_alerted = True + + if self.lead_departing_alert and self.sm.frame % 50 == 0: + lead = self.sm['radarState'].leadOne + lead_distance = lead.dRel + + lead_departing = lead_distance - self.previous_lead_distance > 0.5 and self.previous_lead_distance != 0 and CS.standstill + self.previous_lead_distance = lead_distance + + lead_departing &= not CS.gasPressed + lead_departing &= lead.vLead > 1 + lead_departing &= self.driving_gear + + if lead_departing: + self.events.add(EventName.leadDeparting) + + if not CS.standstill: + if self.sm['modelV2'].meta.turnDirection == Desire.turnLeft: + self.events.add(EventName.turningLeft) + elif self.sm['modelV2'].meta.turnDirection == Desire.turnRight: + self.events.add(EventName.turningRight) + + if os.path.isfile(os.path.join(sentry.CRASHES_DIR, 'error.txt')) and self.crashed_timer < 10: + self.events.add(EventName.openpilotCrashed) + + if self.random_events and not self.openpilot_crashed_triggered: + self.events.add(EventName.openpilotCrashedRandomEvents) + self.openpilot_crashed_triggered = True + + self.crashed_timer += DT_CTRL + + if self.speed_limit_alert or self.speed_limit_confirmation: + current_speed_limit = self.sm['frogpilotPlan'].slcSpeedLimit + desired_speed_limit = self.sm['frogpilotPlan'].unconfirmedSlcSpeedLimit + + speed_limit_changed = desired_speed_limit != self.previous_speed_limit and abs(current_speed_limit - desired_speed_limit) > 1 + + speed_limit_changed_lower = speed_limit_changed and self.previous_speed_limit > desired_speed_limit + speed_limit_changed_higher = speed_limit_changed and self.previous_speed_limit < desired_speed_limit + + self.previous_speed_limit = desired_speed_limit + + if self.CP.pcmCruise and self.FPCC.speedLimitChanged: + if any(be.type == ButtonType.accelCruise for be in CS.buttonEvents): + self.params_memory.put_bool("SLCConfirmed", True) + self.params_memory.put_bool("SLCConfirmedPressed", True) + elif any(be.type == ButtonType.decelCruise for be in CS.buttonEvents): + self.params_memory.put_bool("SLCConfirmed", False) + self.params_memory.put_bool("SLCConfirmedPressed", True) + + if speed_limit_changed_lower: + if self.speed_limit_confirmation_lower: + self.FPCC.speedLimitChanged = True + else: + self.params_memory.put_bool("SLCConfirmed", True) + elif speed_limit_changed_higher: + if self.speed_limit_confirmation_higher: + self.FPCC.speedLimitChanged = True + else: + self.params_memory.put_bool("SLCConfirmed", True) + + if self.params_memory.get_bool("SLCConfirmedPressed") or not self.speed_limit_confirmation: + self.FPCC.speedLimitChanged = False + self.params_memory.put_bool("SLCConfirmedPressed", False) + + if (speed_limit_changed_lower or speed_limit_changed_higher) and self.speed_limit_alert: + self.events.add(EventName.speedLimitChanged) + + if self.FPCC.speedLimitChanged: + self.speed_limit_timer += DT_CTRL + if self.speed_limit_timer >= 10: + self.FPCC.speedLimitChanged = False + self.speed_limit_timer = 0 + else: + self.speed_limit_timer = 0 + else: + self.FPCC.speedLimitChanged = False + + if self.sm.frame == 550 and self.CP.lateralTuning.which() == 'torque' and self.CI.use_nnff: + self.events.add(EventName.torqueNNLoad) + + if self.random_events: + conversion = 1 if self.is_metric else CV.KPH_TO_MPH + v_cruise = max(self.v_cruise_helper.v_cruise_cluster_kph, self.v_cruise_helper.v_cruise_kph) * conversion + + if 70 > v_cruise >= 69: + if not self.vCruise69_alert_played: + self.events.add(EventName.vCruise69) + self.vCruise69_alert_played = True + else: + self.vCruise69_alert_played = False + + def update_frogpilot_variables(self, CS): + self.driving_gear = CS.gearShifter not in (GearShifter.neutral, GearShifter.park, GearShifter.reverse, GearShifter.unknown) + + self.FPCC.alwaysOnLateral |= CS.cruiseState.enabled or self.always_on_lateral_main + self.FPCC.alwaysOnLateral &= CS.cruiseState.available + self.FPCC.alwaysOnLateral &= self.always_on_lateral + self.FPCC.alwaysOnLateral &= self.driving_gear + self.FPCC.alwaysOnLateral &= self.speed_check + self.FPCC.alwaysOnLateral &= not (CS.brakePressed and CS.vEgo < self.always_on_lateral_pause_speed) or CS.standstill + + if self.CP.openpilotLongitudinalControl and self.frogpilot_variables.conditional_experimental_mode: + self.experimental_mode = self.sm['frogpilotPlan'].conditionalExperimental + + self.drive_distance += CS.vEgo * DT_CTRL + self.drive_time += DT_CTRL + + if self.drive_time > 60 and CS.standstill: + current_total_distance = self.params_storage.get_float("FrogPilotKilometers") + distance_to_add = self.drive_distance / 1000 + self.params_storage.put_float_nonblocking("FrogPilotKilometers", current_total_distance + distance_to_add) + self.drive_distance = 0 + + current_total_time = self.params_storage.get_float("FrogPilotMinutes") + time_to_add = self.drive_time / 60 + self.params_storage.put_float_nonblocking("FrogPilotMinutes", current_total_time + time_to_add) + self.drive_time = 0 + + if self.sm.frame * DT_CTRL > 60 * 5 and not self.drive_added: + current_total_drives = self.params_storage.get_int("FrogPilotDrives") + self.params_storage.put_int_nonblocking("FrogPilotDrives", current_total_drives + 1) + self.drive_added = True + + if any(be.pressed and be.type == FrogPilotButtonType.lkas for be in CS.buttonEvents) and self.experimental_mode_via_lkas: + if self.frogpilot_variables.conditional_experimental_mode: + conditional_status = self.params_memory.get_int("CEStatus") + override_value = 0 if conditional_status in {1, 2, 3, 4, 5, 6} else 3 if conditional_status >= 7 else 4 + self.params_memory.put_int("CEStatus", override_value) + else: + self.params.put_bool_nonblocking("ExperimentalMode", not self.experimental_mode) + + self.previously_enabled |= (self.enabled or self.FPCC.alwaysOnLateral) and CS.vEgo > CRUISING_SPEED + self.previously_enabled &= self.driving_gear + + if self.random_event_triggered: + self.random_event_timer += DT_CTRL + if self.random_event_timer >= 4: + self.random_event_triggered = False + self.random_event_timer = 0 + self.params_memory.remove("CurrentRandomEvent") + + signal_check = CS.vEgo >= self.pause_lateral_below_speed or not (CS.leftBlinker or CS.rightBlinker) or CS.standstill + self.speed_check = CS.vEgo >= self.pause_lateral_below_speed or CS.standstill or signal_check and self.pause_lateral_below_signal + + self.FPCC.trafficModeActive = self.frogpilot_variables.traffic_mode and self.params_memory.get_bool("TrafficModeActive") + + fpcc_send = messaging.new_message('frogpilotCarControl') + fpcc_send.valid = CS.canValid + fpcc_send.frogpilotCarControl = self.FPCC + self.pm.send('frogpilotCarControl', fpcc_send) + + if self.params_memory.get_bool("FrogPilotTogglesUpdated"): + self.update_frogpilot_params() + def update_frogpilot_params(self): - self.frogpilot_variables.conditional_experimental_mode = self.params.get_bool("ConditionalExperimental") + self.always_on_lateral_pause_speed = self.always_on_lateral and self.params.get_int("PauseAOLOnBrake") + + self.frogpilot_variables.conditional_experimental_mode = self.CP.openpilotLongitudinalControl and self.params.get_bool("ConditionalExperimental") custom_alerts = self.params.get_bool("CustomAlerts") self.green_light_alert = custom_alerts and self.params.get_bool("GreenLightAlert") - self.lead_departing_alert = custom_alerts and self.params.get_bool("LeadDepartingAlert") + self.lead_departing_alert = not self.radarless_model and custom_alerts and self.params.get_bool("LeadDepartingAlert") self.loud_blindspot_alert = custom_alerts and self.params.get_bool("LoudBlindspotAlert") custom_theme = self.params.get_bool("CustomTheme") @@ -1234,10 +1150,17 @@ class Controls: frog_sounds = custom_sounds == 1 self.goat_scream = frog_sounds and self.params.get_bool("GoatScream") self.holiday_themes = custom_theme and self.params.get_bool("HolidayThemes") + self.random_events = custom_theme and self.params.get_bool("RandomEvents") - experimental_mode_activation = self.params.get_bool("ExperimentalModeActivation") + device_management = self.params.get_bool("DeviceManagement") + self.increase_thermal_limits = device_management and self.params.get_bool("IncreaseThermalLimits") + + experimental_mode_activation = self.CP.openpilotLongitudinalControl and self.params.get_bool("ExperimentalModeActivation") self.frogpilot_variables.experimental_mode_via_distance = experimental_mode_activation and self.params.get_bool("ExperimentalModeViaDistance") - self.frogpilot_variables.experimental_mode_via_lkas = experimental_mode_activation and self.params.get_bool("ExperimentalModeViaLKAS") + self.experimental_mode_via_lkas = experimental_mode_activation and self.params.get_bool("ExperimentalModeViaLKAS") + + lane_detection = self.params.get_bool("NudgelessLaneChange") and self.params.get_int("LaneDetectionWidth") != 0 + self.lane_detection_width = self.params.get_int("LaneDetectionWidth") * (1 if self.is_metric else CV.FOOT_TO_METER) / 10 if lane_detection else 0 lateral_tune = self.params.get_bool("LateralTune") self.force_auto_tune = lateral_tune and self.params.get_float("ForceAutoTune") @@ -1245,28 +1168,23 @@ class Controls: self.steer_ratio = self.params.get_float("SteerRatio") if lateral_tune else stock_steer_ratio self.use_custom_steer_ratio = self.steer_ratio != stock_steer_ratio - self.frogpilot_variables.lock_doors = self.params.get_bool("LockDoors") self.frogpilot_variables.long_pitch = self.params.get_bool("LongPitch") - longitudinal_tune = self.params.get_bool("LongitudinalTune") + longitudinal_tune = self.CP.openpilotLongitudinalControl and self.params.get_bool("LongitudinalTune") self.frogpilot_variables.sport_plus = longitudinal_tune and self.params.get_int("AccelerationProfile") == 3 self.frogpilot_variables.traffic_mode = longitudinal_tune and self.params.get_bool("TrafficMode") - self.lane_detection = self.params.get_bool("LaneDetection") and self.params.get_bool("NudgelessLaneChange") - self.lane_detection_width = self.params.get_int("LaneDetectionWidth") * (1 if self.is_metric else CV.FOOT_TO_METER) / 10 if self.lane_detection else 0 - - self.frogpilot_variables.personalities_via_wheel = self.params.get_bool("PersonalitiesViaWheel") and self.params.get_bool("AdjustablePersonalities") - self.frogpilot_variables.sng_hack = self.params.get_bool("SNGHack") - quality_of_life = self.params.get_bool("QOLControls") - self.pause_lateral_on_signal = self.params.get_int("PauseLateralOnSignal") * (CV.KPH_TO_MS if self.is_metric else CV.MPH_TO_MS) if quality_of_life else 0 + self.frogpilot_variables.custom_cruise_increase = self.params.get_int("CustomCruise") if quality_of_life else 1 + self.frogpilot_variables.custom_cruise_increase_long = self.params.get_int("CustomCruiseLong") if quality_of_life else 5 + self.pause_lateral_below_speed = self.params.get_int("PauseLateralSpeed") * (CV.KPH_TO_MS if self.is_metric else CV.MPH_TO_MS) if quality_of_life else 0 + self.pause_lateral_below_signal = quality_of_life and self.params.get_bool("PauseLateralOnSignal") self.frogpilot_variables.reverse_cruise_increase = quality_of_life and self.params.get_bool("ReverseCruise") self.frogpilot_variables.set_speed_offset = self.params.get_int("SetSpeedOffset") * (1 if self.is_metric else CV.MPH_TO_KPH) if quality_of_life else 0 - self.random_events = self.params.get_bool("RandomEvents") - self.frogpilot_variables.use_ev_tables = self.params.get_bool("EVTable") + self.frogpilot_variables.sng_hack = self.params.get_bool("SNGHack") - self.speed_limit_controller = self.params.get_bool("SpeedLimitController") + self.speed_limit_controller = self.CP.openpilotLongitudinalControl and self.params.get_bool("SpeedLimitController") self.frogpilot_variables.force_mph_dashboard = self.speed_limit_controller and self.params.get_bool("ForceMPHDashboard") self.frogpilot_variables.set_speed_limit = self.speed_limit_controller and self.params.get_bool("SetSpeedLimit") self.speed_limit_alert = self.speed_limit_controller and self.params.get_bool("SpeedLimitChangedAlert") @@ -1274,7 +1192,14 @@ class Controls: self.speed_limit_confirmation_lower = self.speed_limit_confirmation and self.params.get_bool("SLCConfirmationLower") self.speed_limit_confirmation_higher = self.speed_limit_confirmation and self.params.get_bool("SLCConfirmationHigher") + toyota_doors = self.params.get_bool("ToyotaDoors") + self.frogpilot_variables.lock_doors = toyota_doors and self.params.get_bool("LockDoors") + self.frogpilot_variables.unlock_doors = toyota_doors and self.params.get_bool("UnlockDoors") + + self.frogpilot_variables.use_ev_tables = self.params.get_bool("EVTable") + def main(): + config_realtime_process(4, Priority.CTRL_HIGH) controls = Controls() controls.controlsd_thread() diff --git a/selfdrive/controls/lib/alertmanager.py b/selfdrive/controls/lib/alertmanager.py index 6abcf4c..f67e269 100644 --- a/selfdrive/controls/lib/alertmanager.py +++ b/selfdrive/controls/lib/alertmanager.py @@ -3,7 +3,6 @@ import os import json from collections import defaultdict from dataclasses import dataclass -from typing import List, Dict, Optional from openpilot.common.basedir import BASEDIR from openpilot.common.params import Params @@ -14,7 +13,7 @@ with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) a OFFROAD_ALERTS = json.load(f) -def set_offroad_alert(alert: str, show_alert: bool, extra_text: Optional[str] = None) -> None: +def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None: if show_alert: a = copy.copy(OFFROAD_ALERTS[alert]) a['extra'] = extra_text or '' @@ -25,7 +24,7 @@ def set_offroad_alert(alert: str, show_alert: bool, extra_text: Optional[str] = @dataclass class AlertEntry: - alert: Optional[Alert] = None + alert: Alert | None = None start_frame: int = -1 end_frame: int = -1 @@ -34,9 +33,9 @@ class AlertEntry: class AlertManager: def __init__(self): - self.alerts: Dict[str, AlertEntry] = defaultdict(AlertEntry) + self.alerts: dict[str, AlertEntry] = defaultdict(AlertEntry) - def add_many(self, frame: int, alerts: List[Alert]) -> None: + def add_many(self, frame: int, alerts: list[Alert]) -> None: for alert in alerts: entry = self.alerts[alert.alert_type] entry.alert = alert @@ -45,7 +44,7 @@ class AlertManager: min_end_frame = entry.start_frame + alert.duration entry.end_frame = max(frame + 1, min_end_frame) - def process_alerts(self, frame: int, clear_event_types: set) -> Optional[Alert]: + def process_alerts(self, frame: int, clear_event_types: set) -> Alert | None: current_alert = AlertEntry() for v in self.alerts.values(): if not v.alert: diff --git a/selfdrive/controls/lib/desire_helper.py b/selfdrive/controls/lib/desire_helper.py index 52240bb..027a441 100644 --- a/selfdrive/controls/lib/desire_helper.py +++ b/selfdrive/controls/lib/desire_helper.py @@ -37,7 +37,6 @@ TURN_DESIRES = { TurnDirection.turnRight: log.Desire.turnRight, } - class DesireHelper: def __init__(self): self.lane_change_state = LaneChangeState.off @@ -70,7 +69,6 @@ class DesireHelper: lane_available = True else: desired_lane = frogpilotPlan.laneWidthLeft if carstate.leftBlinker else frogpilotPlan.laneWidthRight - # Check if the lane width exceeds the threshold lane_available = desired_lane >= self.lane_detection_width if not lateral_active or self.lane_change_timer > LANE_CHANGE_TIME_MAX: @@ -82,7 +80,6 @@ class DesireHelper: # Set the "turn_completed" flag to prevent lane changes after completing a turn self.turn_completed = True else: - # TurnDirection.turnLeft / turnRight self.turn_direction = TurnDirection.none # LaneChangeState.off @@ -104,19 +101,17 @@ class DesireHelper: blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or (carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right)) - # Conduct a nudgeless lane change if all the conditions are met self.lane_change_wait_timer += DT_MDL if self.nudgeless and lane_available and not self.lane_change_completed and self.lane_change_wait_timer >= self.lane_change_delay: - self.lane_change_wait_timer = 0 torque_applied = True + self.lane_change_wait_timer = 0 if not one_blinker or below_lane_change_speed: self.lane_change_state = LaneChangeState.off self.lane_change_direction = LaneChangeDirection.none elif torque_applied and not blindspot_detected: - # Set the "lane_change_completed" flag to prevent any more lane changes if the toggle is on - self.lane_change_completed = self.one_lane_change self.lane_change_state = LaneChangeState.laneChangeStarting + self.lane_change_completed = True if self.one_lane_change else False # LaneChangeState.laneChangeStarting elif self.lane_change_state == LaneChangeState.laneChangeStarting: @@ -144,10 +139,8 @@ class DesireHelper: else: self.lane_change_timer += DT_MDL - self.prev_one_blinker = one_blinker - - # Reset the flags self.lane_change_completed &= one_blinker + self.prev_one_blinker = one_blinker self.turn_completed &= one_blinker if self.turn_direction != TurnDirection.none: @@ -173,10 +166,11 @@ class DesireHelper: def update_frogpilot_params(self): is_metric = self.params.get_bool("IsMetric") + lateral_tune = self.params.get_bool("LateralTune") + self.turn_desires = lateral_tune and self.params.get_bool("TurnDesires") + self.nudgeless = self.params.get_bool("NudgelessLaneChange") self.lane_change_delay = self.params.get_int("LaneChangeTime") if self.nudgeless else 0 - self.lane_detection = self.nudgeless and self.params.get_bool("LaneDetection") + self.lane_detection = self.nudgeless and self.params.get_int("LaneDetectionWidth") != 0 self.lane_detection_width = self.params.get_int("LaneDetectionWidth") * (1 if is_metric else CV.FOOT_TO_METER) / 10 if self.lane_detection else 0 self.one_lane_change = self.nudgeless and self.params.get_bool("OneLaneChange") - - self.turn_desires = self.params.get_bool("TurnDesires") diff --git a/selfdrive/controls/lib/drive_helpers.py b/selfdrive/controls/lib/drive_helpers.py index 0a8645a..f8c7ac4 100644 --- a/selfdrive/controls/lib/drive_helpers.py +++ b/selfdrive/controls/lib/drive_helpers.py @@ -93,22 +93,18 @@ class VCruiseHelper: long_press = True break - # Reverse the long press value for reverse cruise increase - if frogpilot_variables.reverse_cruise_increase: - long_press = not long_press - if button_type is None: return # Confirm or deny the new speed limit value if speed_limit_changed: if button_type == ButtonType.accelCruise: - self.params_memory.put_bool("SLCConfirmed", True); - self.params_memory.put_bool("SLCConfirmedPressed", True); + self.params_memory.put_bool("SLCConfirmed", True) + self.params_memory.put_bool("SLCConfirmedPressed", True) return elif button_type == ButtonType.decelCruise: - self.params_memory.put_bool("SLCConfirmed", False); - self.params_memory.put_bool("SLCConfirmedPressed", True); + self.params_memory.put_bool("SLCConfirmed", False) + self.params_memory.put_bool("SLCConfirmedPressed", True) return # Don't adjust speed when pressing resume to exit standstill @@ -120,13 +116,13 @@ class VCruiseHelper: if not self.button_change_states[button_type]["enabled"]: return - v_cruise_delta = v_cruise_delta * (5 if long_press else 1) - if long_press and self.v_cruise_kph % v_cruise_delta != 0: # partial interval + v_cruise_delta_interval = frogpilot_variables.custom_cruise_increase_long if long_press else frogpilot_variables.custom_cruise_increase + v_cruise_delta = v_cruise_delta * v_cruise_delta_interval + if v_cruise_delta_interval % 5 == 0 and self.v_cruise_kph % v_cruise_delta != 0: # partial interval self.v_cruise_kph = CRUISE_NEAREST_FUNC[button_type](self.v_cruise_kph / v_cruise_delta) * v_cruise_delta else: self.v_cruise_kph += v_cruise_delta * CRUISE_INTERVAL_SIGN[button_type] - # Apply offset v_cruise_offset = (frogpilot_variables.set_speed_offset * CRUISE_INTERVAL_SIGN[button_type]) if long_press else 0 if v_cruise_offset < 0: v_cruise_offset = frogpilot_variables.set_speed_offset - v_cruise_delta @@ -155,25 +151,20 @@ class VCruiseHelper: if self.CP.pcmCruise: return - if frogpilot_variables.conditional_experimental_mode: - initial = V_CRUISE_INITIAL - else: - initial = V_CRUISE_INITIAL_EXPERIMENTAL_MODE if experimental_mode else V_CRUISE_INITIAL + initial = V_CRUISE_INITIAL_EXPERIMENTAL_MODE if experimental_mode else V_CRUISE_INITIAL # 250kph or above probably means we never had a set speed if any(b.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for b in CS.buttonEvents) and self.v_cruise_kph_last < 250: self.v_cruise_kph = self.v_cruise_kph_last else: - # Initial set speed if desired_speed_limit != 0 and frogpilot_variables.set_speed_limit: - # If there's a known speed limit and the corresponding FP toggle is set, push it to the car - self.v_cruise_kph = desired_speed_limit * CV.MS_TO_KPH + self.v_cruise_kph = int(round(desired_speed_limit * CV.MS_TO_KPH)) else: - # Use fixed initial set speed from mode etc. self.v_cruise_kph = int(round(clip(CS.vEgo * CV.MS_TO_KPH, initial, V_CRUISE_MAX))) self.v_cruise_cluster_kph = self.v_cruise_kph + def apply_deadzone(error, deadzone): if error > deadzone: error -= deadzone diff --git a/selfdrive/controls/lib/events.py b/selfdrive/controls/lib/events.py index 9d94e28..1f59cff 100644 --- a/selfdrive/controls/lib/events.py +++ b/selfdrive/controls/lib/events.py @@ -2,7 +2,7 @@ import math import os from enum import IntEnum -from typing import Dict, Union, Callable, List, Optional +from collections.abc import Callable from cereal import log, car import cereal.messaging as messaging @@ -12,6 +12,7 @@ from openpilot.common.realtime import DT_CTRL from openpilot.selfdrive.locationd.calibrationd import MIN_SPEED_FILTER from openpilot.system.version import get_short_branch +params = Params() params_memory = Params("/dev/shm/params") AlertSize = log.ControlsState.AlertSize @@ -51,12 +52,12 @@ EVENT_NAME = {v: k for k, v in EventName.schema.enumerants.items()} class Events: def __init__(self): - self.events: List[int] = [] - self.static_events: List[int] = [] + self.events: list[int] = [] + self.static_events: list[int] = [] self.events_prev = dict.fromkeys(EVENTS.keys(), 0) @property - def names(self) -> List[int]: + def names(self) -> list[int]: return self.events def __len__(self) -> int: @@ -74,7 +75,7 @@ class Events: def contains(self, event_type: str) -> bool: return any(event_type in EVENTS.get(e, {}) for e in self.events) - def create_alerts(self, event_types: List[str], callback_args=None): + def create_alerts(self, event_types: list[str], callback_args=None): if callback_args is None: callback_args = [] @@ -135,7 +136,7 @@ class Alert: self.creation_delay = creation_delay self.alert_type = "" - self.event_type: Optional[str] = None + self.event_type: str | None = None def __str__(self) -> str: return f"{self.alert_text_1}/{self.alert_text_2} {self.priority} {self.visual_alert} {self.audible_alert}" @@ -261,15 +262,14 @@ def no_gps_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, m AlertStatus.normal, AlertSize.mid, Priority.LOWER, VisualAlert.none, AudibleAlert.none, .2, creation_delay=300.) - def torque_nn_load_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: - model_name = params_memory.get("NNFFModelName") + model_name = params.get("NNFFModelName", encoding='utf-8') if model_name == "": return Alert( "NNFF Torque Controller not available", - "Donate logs to Twilsonco to get it added!", + "Donate logs to Twilsonco to get your car supported!", AlertStatus.userPrompt, AlertSize.mid, - Priority.LOW, VisualAlert.none, AudibleAlert.prompt, 6.0) + Priority.LOW, VisualAlert.none, AudibleAlert.prompt, 5.0) else: return Alert( "NNFF Torque Controller loaded", @@ -379,13 +379,14 @@ def holiday_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, def no_lane_available_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: lane_width = sm['frogpilotPlan'].laneWidthLeft if CS.leftBlinker else sm['frogpilotPlan'].laneWidthRight lane_width_msg = f"{lane_width:.1f} meters" if metric else f"{lane_width * CV.METER_TO_FOOT:.1f} feet" + return Alert( "No lane available", f"Detected lane width is only {lane_width_msg}", AlertStatus.normal, AlertSize.mid, Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .2) -EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { +EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = { # ********** events with no alerts ********** EventName.stockFcw: {}, @@ -1009,7 +1010,13 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { }, # FrogPilot Events - EventName.frogSteerSaturated: { + EventName.blockUser: { + ET.PERMANENT: NormalPermanentAlert("Dashcam mode", + "Please don't use the 'Development' branch!", + priority=Priority.HIGHEST), + }, + + EventName.goatSteerSaturated: { ET.WARNING: Alert( "Turn Exceeds Steering Limit", "JESUS TAKE THE WHEEL!!", @@ -1110,6 +1117,14 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { Priority.LOW, VisualAlert.none, AudibleAlert.nessie, 4.), }, + EventName.accel40: { + ET.WARNING: Alert( + "Great Scott!", + "🚗💨", + AlertStatus.frogpilot, AlertSize.mid, + Priority.LOW, VisualAlert.none, AudibleAlert.doc, 4.), + }, + EventName.firefoxSteerSaturated: { ET.WARNING: Alert( "Turn Exceeds Steering Limit", @@ -1137,7 +1152,7 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { EventName.yourFrogTriedToKillMe: { ET.PERMANENT: Alert( "Your frog tried to kill me...", - "😡", + "👺", AlertStatus.frogpilot, AlertSize.mid, Priority.MID, VisualAlert.none, AudibleAlert.angry, 5.), }, @@ -1150,7 +1165,7 @@ if __name__ == '__main__': from collections import defaultdict event_names = {v: k for k, v in EventName.schema.enumerants.items()} - alerts_by_type: Dict[str, Dict[Priority, List[str]]] = defaultdict(lambda: defaultdict(list)) + alerts_by_type: dict[str, dict[Priority, list[str]]] = defaultdict(lambda: defaultdict(list)) CP = car.CarParams.new_message() CS = car.CarState.new_message() @@ -1162,7 +1177,7 @@ if __name__ == '__main__': alert = alert(CP, CS, sm, False, 1) alerts_by_type[et][alert.priority].append(event_names[i]) - all_alerts: Dict[str, List[tuple[Priority, List[str]]]] = {} + all_alerts: dict[str, list[tuple[Priority, list[str]]]] = {} for et, priority_alerts in alerts_by_type.items(): all_alerts[et] = sorted(priority_alerts.items(), key=lambda x: x[0], reverse=True) diff --git a/selfdrive/controls/lib/latcontrol_torque.py b/selfdrive/controls/lib/latcontrol_torque.py index ea07586..5402698 100644 --- a/selfdrive/controls/lib/latcontrol_torque.py +++ b/selfdrive/controls/lib/latcontrol_torque.py @@ -73,10 +73,10 @@ class LatControlTorque(LatControl): self.steering_angle_deadzone_deg = self.torque_params.steeringAngleDeadzoneDeg # Twilsonco's Lateral Neural Network Feedforward - self.use_nn = CI.has_lateral_torque_nn - self.use_lateral_jerk = CI.use_lateral_jerk + self.use_nnff = CI.use_nnff + self.use_nnff_lite = CI.use_nnff_lite - if self.use_nn or self.use_lateral_jerk: + if self.use_nnff or self.use_nnff_lite: # Instantaneous lateral jerk changes very rapidly, making it not useful on its own, # however, we can "look ahead" to the future planned lateral jerk in order to guage # whether the current desired lateral jerk will persist into the future, i.e. @@ -96,7 +96,7 @@ class LatControlTorque(LatControl): self.t_diffs = np.diff(ModelConstants.T_IDXS) self.desired_lat_jerk_time = CP.steerActuatorDelay + 0.3 - if self.use_nn: + if self.use_nnff: self.pitch = FirstOrderFilter(0.0, 0.5, 0.01) # NN model takes current v_ego, lateral_accel, lat accel/jerk error, roll, and past/future/planned data # of lat accel and roll @@ -137,7 +137,7 @@ class LatControlTorque(LatControl): if self.use_steering_angle: actual_curvature = actual_curvature_vm curvature_deadzone = abs(VM.calc_curvature(math.radians(self.steering_angle_deadzone_deg), CS.vEgo, 0.0)) - if self.use_nn or self.use_lateral_jerk: + if self.use_nnff or self.use_nnff_lite: actual_curvature_rate = -VM.calc_curvature(math.radians(CS.steeringRateDeg), CS.vEgo, 0.0) actual_lateral_jerk = actual_curvature_rate * CS.vEgo ** 2 else: @@ -152,14 +152,14 @@ class LatControlTorque(LatControl): actual_lateral_accel = actual_curvature * CS.vEgo ** 2 lateral_accel_deadzone = curvature_deadzone * CS.vEgo ** 2 - low_speed_factor = interp(CS.vEgo, LOW_SPEED_X, LOW_SPEED_Y if not self.use_nn else LOW_SPEED_Y_NN)**2 + low_speed_factor = interp(CS.vEgo, LOW_SPEED_X, LOW_SPEED_Y if not self.use_nnff else LOW_SPEED_Y_NN)**2 setpoint = desired_lateral_accel + low_speed_factor * desired_curvature measurement = actual_lateral_accel + low_speed_factor * actual_curvature - + lateral_jerk_setpoint = 0 lateral_jerk_measurement = 0 - if self.use_nn or self.use_lateral_jerk: + if self.use_nnff or self.use_nnff_lite: # prepare "look-ahead" desired lateral jerk lat_accel_friction_factor = self.lat_accel_friction_factor if len(model_data.acceleration.y) == ModelConstants.IDX_N: @@ -180,7 +180,7 @@ class LatControlTorque(LatControl): lookahead_lateral_jerk = 0.0 model_good = model_data is not None and len(model_data.orientation.x) >= CONTROL_N - if self.use_nn and model_good: + if self.use_nnff and model_good: # update past data roll = params.roll pitch = self.pitch.update(llk.calibratedOrientationNED.value[1]) if len(llk.calibratedOrientationNED.value) > 1 else 0.0 @@ -219,17 +219,17 @@ class LatControlTorque(LatControl): # apply friction override for cars with low NN friction response if self.nn_friction_override: pid_log.error += self.torque_from_lateral_accel(LatControlInputs(0.0, 0.0, CS.vEgo, CS.aEgo), self.torque_params, - friction_input, lateral_accel_deadzone, friction_compensation=True, gravity_adjusted=False) + friction_input, lateral_accel_deadzone, friction_compensation=True, gravity_adjusted=False) nn_log = nn_input + nnff_setpoint_input + nnff_measurement_input else: gravity_adjusted_lateral_accel = desired_lateral_accel - roll_compensation torque_from_setpoint = self.torque_from_lateral_accel(LatControlInputs(setpoint, roll_compensation, CS.vEgo, CS.aEgo), self.torque_params, - lateral_jerk_setpoint, lateral_accel_deadzone, friction_compensation=self.use_lateral_jerk, gravity_adjusted=False) + lateral_jerk_setpoint, lateral_accel_deadzone, friction_compensation=self.use_nnff_lite, gravity_adjusted=False) torque_from_measurement = self.torque_from_lateral_accel(LatControlInputs(measurement, roll_compensation, CS.vEgo, CS.aEgo), self.torque_params, - lateral_jerk_measurement, lateral_accel_deadzone, friction_compensation=self.use_lateral_jerk, gravity_adjusted=False) + lateral_jerk_measurement, lateral_accel_deadzone, friction_compensation=self.use_nnff_lite, gravity_adjusted=False) pid_log.error = torque_from_setpoint - torque_from_measurement error = desired_lateral_accel - actual_lateral_accel - if self.use_lateral_jerk: + if self.use_nnff_lite: friction_input = lat_accel_friction_factor * error + self.lat_jerk_friction_factor * lookahead_lateral_jerk else: friction_input = error diff --git a/selfdrive/controls/lib/longcontrol.py b/selfdrive/controls/lib/longcontrol.py index ee65c4a..761d978 100644 --- a/selfdrive/controls/lib/longcontrol.py +++ b/selfdrive/controls/lib/longcontrol.py @@ -1,5 +1,6 @@ from cereal import car from openpilot.common.numpy_fast import clip, interp +from openpilot.common.params import Params from openpilot.common.realtime import DT_CTRL from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, apply_deadzone from openpilot.selfdrive.controls.lib.pid import PIDController @@ -60,6 +61,12 @@ class LongControl: self.v_pid = 0.0 self.last_output_accel = 0.0 + # FrogPilot variables + self.params = Params() + self.params_memory = Params("/dev/shm/params") + + self.update_frogpilot_params() + def reset(self, v_pid): """Reset PID controller and change setpoint""" self.pid.reset() @@ -129,4 +136,20 @@ class LongControl: self.last_output_accel = clip(output_accel, accel_limits[0], accel_limits[1]) + if self.params_memory.get_bool("FrogPilotTogglesUpdated"): + self.update_frogpilot_params() + return self.last_output_accel + + def update_frogpilot_params(self): + kiV = [self.params.get_float("kiV1"), self.params.get_float("kiV2"), self.params.get_float("kiV3"), self.params.get_float("kiV4")] + kpV = [self.params.get_float("kpV1"), self.params.get_float("kpV2"), self.params.get_float("kpV3"), self.params.get_float("kpV4")] + + if self.params.get_bool("Tuning"): + self.pid = PIDController((self.CP.longitudinalTuning.kpBP, kpV), + (self.CP.longitudinalTuning.kiBP, kiV), + k_f=self.CP.longitudinalTuning.kf, rate=1 / DT_CTRL) + else: + self.pid = PIDController((self.CP.longitudinalTuning.kpBP, self.CP.longitudinalTuning.kpV), + (self.CP.longitudinalTuning.kiBP, self.CP.longitudinalTuning.kiV), + k_f=self.CP.longitudinalTuning.kf, rate=1 / DT_CTRL) diff --git a/selfdrive/controls/lib/longitudinal_planner.py b/selfdrive/controls/lib/longitudinal_planner.py old mode 100755 new mode 100644 index d423a62..6880f56 --- a/selfdrive/controls/lib/longitudinal_planner.py +++ b/selfdrive/controls/lib/longitudinal_planner.py @@ -2,35 +2,36 @@ import math import numpy as np from openpilot.common.numpy_fast import clip, interp -from openpilot.common.params import Params -from cereal import log import cereal.messaging as messaging from openpilot.common.conversions import Conversions as CV from openpilot.common.filter_simple import FirstOrderFilter +from openpilot.common.params import Params +from openpilot.common.simple_kalman import KF1D from openpilot.common.realtime import DT_MDL +from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.modeld.constants import ModelConstants from openpilot.selfdrive.car.interfaces import ACCEL_MIN, ACCEL_MAX from openpilot.selfdrive.controls.lib.longcontrol import LongCtrlState from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import LongitudinalMpc -from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC +from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC, LEAD_ACCEL_TAU from openpilot.selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX, CONTROL_N, get_speed_error -from openpilot.common.swaglog import cloudlog +from openpilot.system.version import get_short_branch -from openpilot.selfdrive.frogpilot.functions.frogpilot_functions import CRUISING_SPEED +from openpilot.selfdrive.frogpilot.controls.lib.model_manager import RADARLESS_MODELS LON_MPC_STEP = 0.2 # first step is 0.2s A_CRUISE_MIN = -1.2 A_CRUISE_MAX_VALS = [1.6, 1.2, 0.8, 0.6] A_CRUISE_MAX_BP = [0., 10.0, 25., 40.] -ACCEL_MAX_PLUS = [ACCEL_MAX, 4.0] -ACCEL_MAX_PLUS_BP = [0., CRUISING_SPEED] - # Lookup table for turns _A_TOTAL_MAX_V = [1.7, 3.2] _A_TOTAL_MAX_BP = [20., 40.] +# Kalman filter states enum +LEAD_KALMAN_SPEED, LEAD_KALMAN_ACCEL = 0, 1 + def get_max_accel(v_ego): return interp(v_ego, A_CRUISE_MAX_BP, A_CRUISE_MAX_VALS) @@ -51,6 +52,72 @@ def limit_accel_in_turns(v_ego, angle_steers, a_target, CP): return [a_target[0], min(a_target[1], a_x_allowed)] +def lead_kf(v_lead: float, dt: float = 0.05): + # Lead Kalman Filter params, calculating K from A, C, Q, R requires the control library. + # hardcoding a lookup table to compute K for values of radar_ts between 0.01s and 0.2s + assert dt > .01 and dt < .2, "Radar time step must be between .01s and 0.2s" + A = [[1.0, dt], [0.0, 1.0]] + C = [1.0, 0.0] + #Q = np.matrix([[10., 0.0], [0.0, 100.]]) + #R = 1e3 + #K = np.matrix([[ 0.05705578], [ 0.03073241]]) + dts = [dt * 0.01 for dt in range(1, 21)] + K0 = [0.12287673, 0.14556536, 0.16522756, 0.18281627, 0.1988689, 0.21372394, + 0.22761098, 0.24069424, 0.253096, 0.26491023, 0.27621103, 0.28705801, + 0.29750003, 0.30757767, 0.31732515, 0.32677158, 0.33594201, 0.34485814, + 0.35353899, 0.36200124] + K1 = [0.29666309, 0.29330885, 0.29042818, 0.28787125, 0.28555364, 0.28342219, + 0.28144091, 0.27958406, 0.27783249, 0.27617149, 0.27458948, 0.27307714, + 0.27162685, 0.27023228, 0.26888809, 0.26758976, 0.26633338, 0.26511557, + 0.26393339, 0.26278425] + K = [[interp(dt, dts, K0)], [interp(dt, dts, K1)]] + + kf = KF1D([[v_lead], [0.0]], A, C, K) + return kf + + +class Lead: + def __init__(self): + self.dRel = 0.0 + self.yRel = 0.0 + self.vLead = 0.0 + self.aLead = 0.0 + self.vLeadK = 0.0 + self.aLeadK = 0.0 + self.aLeadTau = LEAD_ACCEL_TAU + self.prob = 0.0 + self.status = False + + self.kf: KF1D | None = None + + def reset(self): + self.status = False + self.kf = None + self.aLeadTau = LEAD_ACCEL_TAU + + def update(self, dRel: float, yRel: float, vLead: float, aLead: float, prob: float): + self.dRel = dRel + self.yRel = yRel + self.vLead = vLead + self.aLead = aLead + self.prob = prob + self.status = True + + if self.kf is None: + self.kf = lead_kf(self.vLead) + else: + self.kf.update(self.vLead) + + self.vLeadK = float(self.kf.x[LEAD_KALMAN_SPEED][0]) + self.aLeadK = float(self.kf.x[LEAD_KALMAN_ACCEL][0]) + + # Learn if constant acceleration + if abs(self.aLeadK) < 0.5: + self.aLeadTau = LEAD_ACCEL_TAU + else: + self.aLeadTau *= 0.9 + + class LongitudinalPlanner: def __init__(self, CP, init_v=0.0, init_a=0.0, dt=DT_MDL): self.CP = CP @@ -62,23 +129,26 @@ class LongitudinalPlanner: self.v_desired_filter = FirstOrderFilter(init_v, 2.0, self.dt) self.v_model_error = 0.0 + self.lead_one = Lead() + self.lead_two = Lead() + self.v_desired_trajectory = np.zeros(CONTROL_N) self.a_desired_trajectory = np.zeros(CONTROL_N) self.j_desired_trajectory = np.zeros(CONTROL_N) self.solverExecutionTime = 0.0 - self.params = Params() - self.param_read_counter = 0 - self.read_param() - self.personality = log.LongitudinalPersonality.standard - def read_param(self): - try: - self.personality = int(self.params.get('LongitudinalPersonality')) - except (ValueError, TypeError): - self.personality = log.LongitudinalPersonality.standard + # FrogPilot variables + self.params = Params() + self.params_memory = Params("/dev/shm/params") + + self.radarless_model = self.params.get("Model", encoding='utf-8') in RADARLESS_MODELS + + self.release = get_short_branch() == "FrogPilot" + + self.update_frogpilot_params() @staticmethod - def parse_model(model_msg, model_error): + def parse_model(model_msg, model_error, v_ego, taco_tune): if (len(model_msg.position.x) == 33 and len(model_msg.velocity.x) == 33 and len(model_msg.acceleration.x) == 33): @@ -91,12 +161,16 @@ class LongitudinalPlanner: v = np.zeros(len(T_IDXS_MPC)) a = np.zeros(len(T_IDXS_MPC)) j = np.zeros(len(T_IDXS_MPC)) + + if taco_tune: + max_lat_accel = interp(v_ego, [5, 10, 20], [1.5, 2.0, 3.0]) + curvatures = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.orientationRate.z) / np.clip(v, 0.3, 100.0) + max_v = np.sqrt(max_lat_accel / (np.abs(curvatures) + 1e-3)) - 2.0 + v = np.minimum(max_v, v) + return x, v, a, j - def update(self, sm, frogpilot_planner, params_memory): - if self.param_read_counter % 50 == 0 or params_memory.get_bool("PersonalityChangedViaUI") or params_memory.get_bool("PersonalityChangedViaWheel"): - self.read_param() - self.param_read_counter += 1 + def update(self, sm): self.mpc.mode = 'blended' if sm['controlsState'].experimentalMode else 'acc' v_ego = sm['carState'].vEgo @@ -112,7 +186,7 @@ class LongitudinalPlanner: # No change cost when user is controlling the speed, or when standstill prev_accel_constraint = not (reset_state or sm['carState'].standstill) - accel_limits = frogpilot_planner.accel_limits + accel_limits = [sm['frogpilotPlan'].minAcceleration, sm['frogpilotPlan'].maxAcceleration] if self.mpc.mode == 'acc': accel_limits_turns = limit_accel_in_turns(v_ego, sm['carState'].steeringAngleDeg, accel_limits, self.CP) else: @@ -120,10 +194,8 @@ class LongitudinalPlanner: if reset_state: self.v_desired_filter.x = v_ego - # Slowly ramp up the acceleration to prevent jerky behaviors from a standstill - max_acceleration = min(interp(v_ego, ACCEL_MAX_PLUS_BP, ACCEL_MAX_PLUS), accel_limits[1]) # Clip aEgo to cruise limits to prevent large accelerations when becoming active - self.a_desired = clip(sm['carState'].aEgo, accel_limits[0], max_acceleration) + self.a_desired = clip(sm['carState'].aEgo, accel_limits[0], accel_limits[1]) # Prevent divergence, smooth in current v_ego self.v_desired_filter.x = max(0.0, self.v_desired_filter.update(v_ego)) @@ -136,13 +208,26 @@ class LongitudinalPlanner: accel_limits_turns[0] = min(accel_limits_turns[0], self.a_desired + 0.05) accel_limits_turns[1] = max(accel_limits_turns[1], self.a_desired - 0.05) - self.mpc.set_weights(prev_accel_constraint, - frogpilot_planner.custom_personalities, frogpilot_planner.aggressive_jerk, frogpilot_planner.standard_jerk, frogpilot_planner.relaxed_jerk, - personality=self.personality) + if self.radarless_model: + model_leads = list(sm['modelV2'].leadsV3) + # TODO lead state should be invalidated if its different point than the previous one + lead_states = [self.lead_one, self.lead_two] + for index in range(len(lead_states)): + if len(model_leads) > index: + model_lead = model_leads[index] + lead_states[index].update(model_lead.x[0], model_lead.y[0], model_lead.v[0], model_lead.a[0], model_lead.prob) + else: + lead_states[index].reset() + else: + self.lead_one = sm['radarState'].leadOne + self.lead_two = sm['radarState'].leadTwo + + self.mpc.set_weights(sm['frogpilotPlan'].jerk, prev_accel_constraint, personality=sm['controlsState'].personality) self.mpc.set_accel_limits(accel_limits_turns[0], accel_limits_turns[1]) self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired) - x, v, a, j = self.parse_model(sm['modelV2'], self.v_model_error) - self.mpc.update(sm['radarState'], frogpilot_planner.v_cruise, x, v, a, j, frogpilot_planner, personality=self.personality) + x, v, a, j = self.parse_model(sm['modelV2'], self.v_model_error, v_ego, self.taco_tune) + self.mpc.update(self.lead_one, self.lead_two, sm['frogpilotPlan'].vCruise, x, v, a, j, self.radarless_model, sm['frogpilotPlan'].tFollow, + sm['frogpilotCarControl'].trafficModeActive, personality=sm['controlsState'].personality) self.v_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, self.mpc.v_solution) self.a_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, self.mpc.a_solution) @@ -160,9 +245,14 @@ class LongitudinalPlanner: self.a_desired = float(interp(self.dt, ModelConstants.T_IDXS[:CONTROL_N], self.a_desired_trajectory)) self.v_desired_filter.x = self.v_desired_filter.x + self.dt * (self.a_desired + a_prev) / 2.0 - frogpilot_planner.update(sm['carState'], sm['controlsState'], sm['modelV2'], self.mpc, sm, v_cruise, v_ego) + if self.params_memory.get_bool("FrogPilotTogglesUpdated"): + self.update_frogpilot_params() - def publish(self, sm, pm, frogpilot_planner): + def update_frogpilot_params(self): + lateral_tune = self.params.get_bool("LateralTune") + self.taco_tune = lateral_tune and self.params.get_bool("TacoTune") and not self.release + + def publish(self, sm, pm): plan_send = messaging.new_message('longitudinalPlan') plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState']) @@ -175,13 +265,10 @@ class LongitudinalPlanner: longitudinalPlan.accels = self.a_desired_trajectory.tolist() longitudinalPlan.jerks = self.j_desired_trajectory.tolist() - longitudinalPlan.hasLead = sm['radarState'].leadOne.status + longitudinalPlan.hasLead = self.lead_one.status longitudinalPlan.longitudinalPlanSource = self.mpc.source longitudinalPlan.fcw = self.fcw longitudinalPlan.solverExecutionTime = self.mpc.solve_time - longitudinalPlan.personality = self.personality pm.send('longitudinalPlan', plan_send) - - frogpilot_planner.publish(sm, pm, self.mpc) diff --git a/selfdrive/controls/lib/tests/__init__.py b/selfdrive/controls/lib/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfdrive/controls/lib/tests/test_alertmanager.py b/selfdrive/controls/lib/tests/test_alertmanager.py new file mode 100644 index 0000000..dbd4285 --- /dev/null +++ b/selfdrive/controls/lib/tests/test_alertmanager.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +import random +import unittest + +from openpilot.selfdrive.controls.lib.events import Alert, EVENTS +from openpilot.selfdrive.controls.lib.alertmanager import AlertManager + + +class TestAlertManager(unittest.TestCase): + + def test_duration(self): + """ + Enforce that an alert lasts for max(alert duration, duration the alert is added) + """ + for duration in range(1, 100): + alert = None + while not isinstance(alert, Alert): + event = random.choice([e for e in EVENTS.values() if len(e)]) + alert = random.choice(list(event.values())) + + alert.duration = duration + + # check two cases: + # - alert is added to AM for <= the alert's duration + # - alert is added to AM for > alert's duration + for greater in (True, False): + if greater: + add_duration = duration + random.randint(1, 10) + else: + add_duration = random.randint(1, duration) + show_duration = max(duration, add_duration) + + AM = AlertManager() + for frame in range(duration+10): + if frame < add_duration: + AM.add_many(frame, [alert, ]) + current_alert = AM.process_alerts(frame, {}) + + shown = current_alert is not None + should_show = frame <= show_duration + self.assertEqual(shown, should_show, msg=f"{frame=} {add_duration=} {duration=}") + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/lib/tests/test_latcontrol.py b/selfdrive/controls/lib/tests/test_latcontrol.py new file mode 100644 index 0000000..866270c --- /dev/null +++ b/selfdrive/controls/lib/tests/test_latcontrol.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +import unittest + +from parameterized import parameterized + +from cereal import car, log +from openpilot.selfdrive.car.car_helpers import interfaces +from openpilot.selfdrive.car.honda.values import CAR as HONDA +from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA +from openpilot.selfdrive.car.nissan.values import CAR as NISSAN +from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID +from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque +from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle +from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel +from openpilot.common.mock.generators import generate_liveLocationKalman + + +class TestLatControl(unittest.TestCase): + + @parameterized.expand([(HONDA.CIVIC, LatControlPID), (TOYOTA.RAV4, LatControlTorque), (NISSAN.LEAF, LatControlAngle)]) + def test_saturation(self, car_name, controller): + CarInterface, CarController, CarState = interfaces[car_name] + CP = CarInterface.get_non_essential_params(car_name) + CI = CarInterface(CP, CarController, CarState) + VM = VehicleModel(CP) + + controller = controller(CP, CI) + + CS = car.CarState.new_message() + CS.vEgo = 30 + CS.steeringPressed = False + + params = log.LiveParametersData.new_message() + + llk = generate_liveLocationKalman() + for _ in range(1000): + _, _, lac_log = controller.update(True, CS, VM, params, False, 1, llk) + + self.assertTrue(lac_log.saturated) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/lib/tests/test_vehicle_model.py b/selfdrive/controls/lib/tests/test_vehicle_model.py new file mode 100644 index 0000000..d016e87 --- /dev/null +++ b/selfdrive/controls/lib/tests/test_vehicle_model.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +import math +import unittest + +import numpy as np +from control import StateSpace + +from openpilot.selfdrive.car.honda.interface import CarInterface +from openpilot.selfdrive.car.honda.values import CAR +from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel, dyn_ss_sol, create_dyn_state_matrices + + +class TestVehicleModel(unittest.TestCase): + def setUp(self): + CP = CarInterface.get_non_essential_params(CAR.CIVIC) + self.VM = VehicleModel(CP) + + def test_round_trip_yaw_rate(self): + # TODO: fix VM to work at zero speed + for u in np.linspace(1, 30, num=10): + for roll in np.linspace(math.radians(-20), math.radians(20), num=11): + for sa in np.linspace(math.radians(-20), math.radians(20), num=11): + yr = self.VM.yaw_rate(sa, u, roll) + new_sa = self.VM.get_steer_from_yaw_rate(yr, u, roll) + + self.assertAlmostEqual(sa, new_sa) + + def test_dyn_ss_sol_against_yaw_rate(self): + """Verify that the yaw_rate helper function matches the results + from the state space model.""" + + for roll in np.linspace(math.radians(-20), math.radians(20), num=11): + for u in np.linspace(1, 30, num=10): + for sa in np.linspace(math.radians(-20), math.radians(20), num=11): + + # Compute yaw rate based on state space model + _, yr1 = dyn_ss_sol(sa, u, roll, self.VM) + + # Compute yaw rate using direct computations + yr2 = self.VM.yaw_rate(sa, u, roll) + self.assertAlmostEqual(float(yr1[0]), yr2) + + def test_syn_ss_sol_simulate(self): + """Verifies that dyn_ss_sol matches a simulation""" + + for roll in np.linspace(math.radians(-20), math.radians(20), num=11): + for u in np.linspace(1, 30, num=10): + A, B = create_dyn_state_matrices(u, self.VM) + + # Convert to discrete time system + ss = StateSpace(A, B, np.eye(2), np.zeros((2, 2))) + ss = ss.sample(0.01) + + for sa in np.linspace(math.radians(-20), math.radians(20), num=11): + inp = np.array([[sa], [roll]]) + + # Simulate for 1 second + x1 = np.zeros((2, 1)) + for _ in range(100): + x1 = ss.A @ x1 + ss.B @ inp + + # Compute steady state solution directly + x2 = dyn_ss_sol(sa, u, roll, self.VM) + + np.testing.assert_almost_equal(x1, x2, decimal=3) + + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/lib/vehicle_model.py b/selfdrive/controls/lib/vehicle_model.py index 0750384..b6f50b4 100755 --- a/selfdrive/controls/lib/vehicle_model.py +++ b/selfdrive/controls/lib/vehicle_model.py @@ -12,7 +12,6 @@ x_dot = A*x + B*u A depends on longitudinal speed, u [m/s], and vehicle parameters CP """ -from typing import Tuple import numpy as np from numpy.linalg import solve @@ -169,7 +168,7 @@ def kin_ss_sol(sa: float, u: float, VM: VehicleModel) -> np.ndarray: return K * sa -def create_dyn_state_matrices(u: float, VM: VehicleModel) -> Tuple[np.ndarray, np.ndarray]: +def create_dyn_state_matrices(u: float, VM: VehicleModel) -> tuple[np.ndarray, np.ndarray]: """Returns the A and B matrix for the dynamics system Args: diff --git a/selfdrive/controls/plannerd.py b/selfdrive/controls/plannerd.py index 6e6a20a..71dc5fe 100755 --- a/selfdrive/controls/plannerd.py +++ b/selfdrive/controls/plannerd.py @@ -6,8 +6,6 @@ from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlanner import cereal.messaging as messaging -from openpilot.selfdrive.frogpilot.functions.frogpilot_planner import FrogPilotPlanner - def publish_ui_plan(sm, pm, longitudinal_planner): ui_send = messaging.new_message('uiPlan') ui_send.valid = sm.all_checks(service_list=['carState', 'controlsState', 'modelV2']) @@ -24,27 +22,22 @@ def plannerd_thread(): cloudlog.info("plannerd is waiting for CarParams") params = Params() - params_memory = Params("/dev/shm/params") with car.CarParams.from_bytes(params.get("CarParams", block=True)) as msg: CP = msg cloudlog.info("plannerd got CarParams: %s", CP.carName) - frogpilot_planner = FrogPilotPlanner(CP, params, params_memory) longitudinal_planner = LongitudinalPlanner(CP) - pm = messaging.PubMaster(['longitudinalPlan', 'uiPlan', 'frogpilotPlan']) - sm = messaging.SubMaster(['carControl', 'carState', 'controlsState', 'radarState', 'modelV2', 'frogpilotNavigation'], + pm = messaging.PubMaster(['longitudinalPlan', 'uiPlan']) + sm = messaging.SubMaster(['carControl', 'carState', 'controlsState', 'radarState', 'modelV2', 'frogpilotCarControl', 'frogpilotPlan'], poll='modelV2', ignore_avg_freq=['radarState']) while True: sm.update() if sm.updated['modelV2']: - longitudinal_planner.update(sm, frogpilot_planner, params_memory) - longitudinal_planner.publish(sm, pm, frogpilot_planner) + longitudinal_planner.update(sm) + longitudinal_planner.publish(sm, pm) publish_ui_plan(sm, pm, longitudinal_planner) - if params_memory.get_bool("FrogPilotTogglesUpdated"): - frogpilot_planner.update_frogpilot_params(params) - def main(): plannerd_thread() diff --git a/selfdrive/controls/radard.py b/selfdrive/controls/radard.py index e49efef..0fdc215 100755 --- a/selfdrive/controls/radard.py +++ b/selfdrive/controls/radard.py @@ -2,7 +2,7 @@ import importlib import math from collections import deque -from typing import Optional, Dict, Any +from typing import Any, Optional import capnp from cereal import messaging, log, car @@ -125,7 +125,7 @@ def laplacian_pdf(x: float, mu: float, b: float): return math.exp(-abs(x-mu)/b) -def match_vision_to_track(v_ego: float, lead: capnp._DynamicStructReader, tracks: Dict[int, Track]): +def match_vision_to_track(v_ego: float, lead: capnp._DynamicStructReader, tracks: dict[int, Track]): offset_vision_dist = lead.x[0] - RADAR_TO_CAMERA def prob(c): @@ -133,7 +133,7 @@ def match_vision_to_track(v_ego: float, lead: capnp._DynamicStructReader, tracks prob_y = laplacian_pdf(c.yRel, -lead.y[0], lead.yStd[0]) prob_v = laplacian_pdf(c.vRel + v_ego, lead.v[0], lead.vStd[0]) - # This is isn't exactly right, but good heuristic + # This isn't exactly right, but it's a good heuristic return prob_d * prob_y * prob_v track = max(tracks.values(), key=prob) @@ -166,8 +166,8 @@ def get_RadarState_from_vision(lead_msg: capnp._DynamicStructReader, v_ego: floa } -def get_lead(v_ego: float, ready: bool, tracks: Dict[int, Track], lead_msg: capnp._DynamicStructReader, - model_v_ego: float, lead_detection_threshold: float = .5, low_speed_override: bool = True) -> Dict[str, Any]: +def get_lead(v_ego: float, ready: bool, tracks: dict[int, Track], lead_msg: capnp._DynamicStructReader, + model_v_ego: float, lead_detection_threshold: float, low_speed_override: bool = True) -> dict[str, Any]: # Determine leads, this is where the essential logic happens if len(tracks) > 0 and ready and lead_msg.prob > lead_detection_threshold: track = match_vision_to_track(v_ego, lead_msg, tracks) @@ -196,14 +196,14 @@ class RadarD: def __init__(self, radar_ts: float, delay: int = 0): self.current_time = 0.0 - self.tracks: Dict[int, Track] = {} + self.tracks: dict[int, Track] = {} self.kalman_params = KalmanParams(radar_ts) self.v_ego = 0.0 self.v_ego_hist = deque([0.0], maxlen=delay+1) self.last_v_ego_frame = -1 - self.radar_state: Optional[capnp._DynamicStructBuilder] = None + self.radar_state: capnp._DynamicStructBuilder | None = None self.radar_state_valid = False self.ready = False diff --git a/selfdrive/controls/radardless.py b/selfdrive/controls/radardless.py new file mode 100644 index 0000000..1c8ad57 --- /dev/null +++ b/selfdrive/controls/radardless.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +import importlib +from typing import Optional + +from cereal import messaging, car +from openpilot.common.params import Params +from openpilot.common.realtime import Ratekeeper, Priority, config_realtime_process +from openpilot.common.swaglog import cloudlog + + +# radar tracks +SPEED, ACCEL = 0, 1 # Kalman filter states enum + +# stationary qualification parameters +V_EGO_STATIONARY = 4. # no stationary object flag below this speed + +RADAR_TO_CENTER = 2.7 # (deprecated) RADAR is ~ 2.7m ahead from center of car +RADAR_TO_CAMERA = 1.52 # RADAR is ~ 1.5m ahead from center of mesh frame + + +class RadarD: + def __init__(self, radar_ts: float, delay: int = 0): + self.points: dict[int, tuple[float, float, float]] = {} + + self.radar_tracks_valid = False + + def update(self, rr: Optional[car.RadarData]): + radar_points = [] + radar_errors = [] + if rr is not None: + radar_points = rr.points + radar_errors = rr.errors + + self.radar_tracks_valid = len(radar_errors) == 0 + + self.points = {} + for pt in radar_points: + self.points[pt.trackId] = (pt.dRel, pt.yRel, pt.vRel) + + def publish(self): + tracks_msg = messaging.new_message('liveTracks', len(self.points)) + tracks_msg.valid = self.radar_tracks_valid + for index, tid in enumerate(sorted(self.points.keys())): + tracks_msg.liveTracks[index] = { + "trackId": tid, + "dRel": float(self.points[tid][0]) + RADAR_TO_CAMERA, + "yRel": -float(self.points[tid][1]), + "vRel": float(self.points[tid][2]), + } + + return tracks_msg + + +# publishes radar tracks +def main(): + config_realtime_process(5, Priority.CTRL_LOW) + + # wait for stats about the car to come in from controls + cloudlog.info("radard is waiting for CarParams") + with car.CarParams.from_bytes(Params().get("CarParams", block=True)) as msg: + CP = msg + cloudlog.info("radard got CarParams") + + # import the radar from the fingerprint + cloudlog.info("radard is importing %s", CP.carName) + RadarInterface = importlib.import_module(f'selfdrive.car.{CP.carName}.radar_interface').RadarInterface + + # *** setup messaging + can_sock = messaging.sub_sock('can') + pub_sock = messaging.pub_sock('liveTracks') + + RI = RadarInterface(CP) + + # TODO timing is different between cars, need a single time step for all cars + # TODO just take the fastest one for now, and keep resending same messages for slower radars + rk = Ratekeeper(1.0 / CP.radarTimeStep, print_delay_threshold=None) + RD = RadarD(CP.radarTimeStep, RI.delay) + + while 1: + can_strings = messaging.drain_sock_raw(can_sock, wait_for_one=True) + rr = RI.update(can_strings) + if rr is None: + continue + + RD.update(rr) + msg = RD.publish() + pub_sock.send(msg.to_bytes()) + + rk.monitor_time() + + +if __name__ == "__main__": + main() diff --git a/selfdrive/controls/tests/__init__.py b/selfdrive/controls/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfdrive/controls/tests/test_alerts.py b/selfdrive/controls/tests/test_alerts.py new file mode 100644 index 0000000..7b4fba0 --- /dev/null +++ b/selfdrive/controls/tests/test_alerts.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +import copy +import json +import os +import unittest +import random +from PIL import Image, ImageDraw, ImageFont + +from cereal import log, car +from cereal.messaging import SubMaster +from openpilot.common.basedir import BASEDIR +from openpilot.common.params import Params +from openpilot.selfdrive.controls.lib.events import Alert, EVENTS, ET +from openpilot.selfdrive.controls.lib.alertmanager import set_offroad_alert +from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS + +AlertSize = log.ControlsState.AlertSize + +OFFROAD_ALERTS_PATH = os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json") + +# TODO: add callback alerts +ALERTS = [] +for event_types in EVENTS.values(): + for alert in event_types.values(): + ALERTS.append(alert) + + +class TestAlerts(unittest.TestCase): + + @classmethod + def setUpClass(cls): + with open(OFFROAD_ALERTS_PATH) as f: + cls.offroad_alerts = json.loads(f.read()) + + # Create fake objects for callback + cls.CS = car.CarState.new_message() + cls.CP = car.CarParams.new_message() + cfg = [c for c in CONFIGS if c.proc_name == 'controlsd'][0] + cls.sm = SubMaster(cfg.pubs) + + def test_events_defined(self): + # Ensure all events in capnp schema are defined in events.py + events = car.CarEvent.EventName.schema.enumerants + + for name, e in events.items(): + if not name.endswith("DEPRECATED"): + fail_msg = "%s @%d not in EVENTS" % (name, e) + self.assertTrue(e in EVENTS.keys(), msg=fail_msg) + + # ensure alert text doesn't exceed allowed width + def test_alert_text_length(self): + font_path = os.path.join(BASEDIR, "selfdrive/assets/fonts") + regular_font_path = os.path.join(font_path, "Inter-SemiBold.ttf") + bold_font_path = os.path.join(font_path, "Inter-Bold.ttf") + semibold_font_path = os.path.join(font_path, "Inter-SemiBold.ttf") + + max_text_width = 2160 - 300 # full screen width is usable, minus sidebar + draw = ImageDraw.Draw(Image.new('RGB', (0, 0))) + + fonts = { + AlertSize.small: [ImageFont.truetype(semibold_font_path, 74)], + AlertSize.mid: [ImageFont.truetype(bold_font_path, 88), + ImageFont.truetype(regular_font_path, 66)], + } + + for alert in ALERTS: + if not isinstance(alert, Alert): + alert = alert(self.CP, self.CS, self.sm, metric=False, soft_disable_time=100) + + # for full size alerts, both text fields wrap the text, + # so it's unlikely that they would go past the max width + if alert.alert_size in (AlertSize.none, AlertSize.full): + continue + + for i, txt in enumerate([alert.alert_text_1, alert.alert_text_2]): + if i >= len(fonts[alert.alert_size]): + break + + font = fonts[alert.alert_size][i] + left, _, right, _ = draw.textbbox((0, 0), txt, font) + width = right - left + msg = f"type: {alert.alert_type} msg: {txt}" + self.assertLessEqual(width, max_text_width, msg=msg) + + def test_alert_sanity_check(self): + for event_types in EVENTS.values(): + for event_type, a in event_types.items(): + # TODO: add callback alerts + if not isinstance(a, Alert): + continue + + if a.alert_size == AlertSize.none: + self.assertEqual(len(a.alert_text_1), 0) + self.assertEqual(len(a.alert_text_2), 0) + elif a.alert_size == AlertSize.small: + self.assertGreater(len(a.alert_text_1), 0) + self.assertEqual(len(a.alert_text_2), 0) + elif a.alert_size == AlertSize.mid: + self.assertGreater(len(a.alert_text_1), 0) + self.assertGreater(len(a.alert_text_2), 0) + else: + self.assertGreater(len(a.alert_text_1), 0) + + self.assertGreaterEqual(a.duration, 0.) + + if event_type not in (ET.WARNING, ET.PERMANENT, ET.PRE_ENABLE): + self.assertEqual(a.creation_delay, 0.) + + def test_offroad_alerts(self): + params = Params() + for a in self.offroad_alerts: + # set the alert + alert = copy.copy(self.offroad_alerts[a]) + set_offroad_alert(a, True) + alert['extra'] = '' + self.assertTrue(json.dumps(alert) == params.get(a, encoding='utf8')) + + # then delete it + set_offroad_alert(a, False) + self.assertTrue(params.get(a) is None) + + def test_offroad_alerts_extra_text(self): + params = Params() + for i in range(50): + # set the alert + a = random.choice(list(self.offroad_alerts)) + alert = self.offroad_alerts[a] + set_offroad_alert(a, True, extra_text="a"*i) + + written_alert = json.loads(params.get(a, encoding='utf8')) + self.assertTrue("a"*i == written_alert['extra']) + self.assertTrue(alert["text"] == written_alert['text']) + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_cruise_speed.py b/selfdrive/controls/tests/test_cruise_speed.py new file mode 100644 index 0000000..c46d03a --- /dev/null +++ b/selfdrive/controls/tests/test_cruise_speed.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +import itertools +import numpy as np +import unittest + +from parameterized import parameterized_class +from cereal import log +from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, V_CRUISE_MIN, V_CRUISE_MAX, V_CRUISE_INITIAL, IMPERIAL_INCREMENT +from cereal import car +from openpilot.common.conversions import Conversions as CV +from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver + +ButtonEvent = car.CarState.ButtonEvent +ButtonType = car.CarState.ButtonEvent.Type + + +def run_cruise_simulation(cruise, e2e, personality, t_end=20.): + man = Maneuver( + '', + duration=t_end, + initial_speed=max(cruise - 1., 0.0), + lead_relevancy=True, + initial_distance_lead=100, + cruise_values=[cruise], + prob_lead_values=[0.0], + breakpoints=[0.], + e2e=e2e, + personality=personality, + ) + valid, output = man.evaluate() + assert valid + return output[-1, 3] + + +@parameterized_class(("e2e", "personality", "speed"), itertools.product( + [True, False], # e2e + log.LongitudinalPersonality.schema.enumerants, # personality + [5,35])) # speed +class TestCruiseSpeed(unittest.TestCase): + def test_cruise_speed(self): + print(f'Testing {self.speed} m/s') + cruise_speed = float(self.speed) + + simulation_steady_state = run_cruise_simulation(cruise_speed, self.e2e, self.personality) + self.assertAlmostEqual(simulation_steady_state, cruise_speed, delta=.01, msg=f'Did not reach {self.speed} m/s') + + +# TODO: test pcmCruise +@parameterized_class(('pcm_cruise',), [(False,)]) +class TestVCruiseHelper(unittest.TestCase): + def setUp(self): + self.CP = car.CarParams(pcmCruise=self.pcm_cruise) + self.v_cruise_helper = VCruiseHelper(self.CP) + self.reset_cruise_speed_state() + + def reset_cruise_speed_state(self): + # Two resets previous cruise speed + for _ in range(2): + self.v_cruise_helper.update_v_cruise(car.CarState(cruiseState={"available": False}), enabled=False, is_metric=False) + + def enable(self, v_ego, experimental_mode): + # Simulates user pressing set with a current speed + self.v_cruise_helper.initialize_v_cruise(car.CarState(vEgo=v_ego), experimental_mode) + + def test_adjust_speed(self): + """ + Asserts speed changes on falling edges of buttons. + """ + + self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) + + for btn in (ButtonType.accelCruise, ButtonType.decelCruise): + for pressed in (True, False): + CS = car.CarState(cruiseState={"available": True}) + CS.buttonEvents = [ButtonEvent(type=btn, pressed=pressed)] + + self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) + self.assertEqual(pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) + + def test_rising_edge_enable(self): + """ + Some car interfaces may enable on rising edge of a button, + ensure we don't adjust speed if enabled changes mid-press. + """ + + # NOTE: enabled is always one frame behind the result from button press in controlsd + for enabled, pressed in ((False, False), + (False, True), + (True, False)): + CS = car.CarState(cruiseState={"available": True}) + CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=pressed)] + self.v_cruise_helper.update_v_cruise(CS, enabled=enabled, is_metric=False) + if pressed: + self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) + + # Expected diff on enabling. Speed should not change on falling edge of pressed + self.assertEqual(not pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) + + def test_resume_in_standstill(self): + """ + Asserts we don't increment set speed if user presses resume/accel to exit cruise standstill. + """ + + self.enable(0, False) + + for standstill in (True, False): + for pressed in (True, False): + CS = car.CarState(cruiseState={"available": True, "standstill": standstill}) + CS.buttonEvents = [ButtonEvent(type=ButtonType.accelCruise, pressed=pressed)] + self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) + + # speed should only update if not at standstill and button falling edge + should_equal = standstill or pressed + self.assertEqual(should_equal, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) + + def test_set_gas_pressed(self): + """ + Asserts pressing set while enabled with gas pressed sets + the speed to the maximum of vEgo and current cruise speed. + """ + + for v_ego in np.linspace(0, 100, 101): + self.reset_cruise_speed_state() + self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) + + # first decrement speed, then perform gas pressed logic + expected_v_cruise_kph = self.v_cruise_helper.v_cruise_kph - IMPERIAL_INCREMENT + expected_v_cruise_kph = max(expected_v_cruise_kph, v_ego * CV.MS_TO_KPH) # clip to min of vEgo + expected_v_cruise_kph = float(np.clip(round(expected_v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX)) + + CS = car.CarState(vEgo=float(v_ego), gasPressed=True, cruiseState={"available": True}) + CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=False)] + self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) + + # TODO: fix skipping first run due to enabled on rising edge exception + if v_ego == 0.0: + continue + self.assertEqual(expected_v_cruise_kph, self.v_cruise_helper.v_cruise_kph) + + def test_initialize_v_cruise(self): + """ + Asserts allowed cruise speeds on enabling with SET. + """ + + for experimental_mode in (True, False): + for v_ego in np.linspace(0, 100, 101): + self.reset_cruise_speed_state() + self.assertFalse(self.v_cruise_helper.v_cruise_initialized) + + self.enable(float(v_ego), experimental_mode) + self.assertTrue(V_CRUISE_INITIAL <= self.v_cruise_helper.v_cruise_kph <= V_CRUISE_MAX) + self.assertTrue(self.v_cruise_helper.v_cruise_initialized) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_following_distance.py b/selfdrive/controls/tests/test_following_distance.py new file mode 100644 index 0000000..f58e638 --- /dev/null +++ b/selfdrive/controls/tests/test_following_distance.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +import unittest +import itertools +from parameterized import parameterized_class + +from cereal import log + +from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import desired_follow_distance, get_T_FOLLOW +from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver + + +def run_following_distance_simulation(v_lead, t_end=100.0, e2e=False, personality=0): + man = Maneuver( + '', + duration=t_end, + initial_speed=float(v_lead), + lead_relevancy=True, + initial_distance_lead=100, + speed_lead_values=[v_lead], + breakpoints=[0.], + e2e=e2e, + personality=personality, + ) + valid, output = man.evaluate() + assert valid + return output[-1,2] - output[-1,1] + + +@parameterized_class(("e2e", "personality", "speed"), itertools.product( + [True, False], # e2e + [log.LongitudinalPersonality.relaxed, # personality + log.LongitudinalPersonality.standard, + log.LongitudinalPersonality.aggressive], + [0,10,35])) # speed +class TestFollowingDistance(unittest.TestCase): + def test_following_distance(self): + v_lead = float(self.speed) + simulation_steady_state = run_following_distance_simulation(v_lead, e2e=self.e2e, personality=self.personality) + correct_steady_state = desired_follow_distance(v_lead, v_lead, get_T_FOLLOW(self.personality)) + err_ratio = 0.2 if self.e2e else 0.1 + self.assertAlmostEqual(simulation_steady_state, correct_steady_state, delta=(err_ratio * correct_steady_state + .5)) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_lateral_mpc.py b/selfdrive/controls/tests/test_lateral_mpc.py new file mode 100644 index 0000000..8c09f46 --- /dev/null +++ b/selfdrive/controls/tests/test_lateral_mpc.py @@ -0,0 +1,89 @@ +import unittest +import numpy as np +from openpilot.selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import LateralMpc +from openpilot.selfdrive.controls.lib.drive_helpers import CAR_ROTATION_RADIUS +from openpilot.selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import N as LAT_MPC_N + + +def run_mpc(lat_mpc=None, v_ref=30., x_init=0., y_init=0., psi_init=0., curvature_init=0., + lane_width=3.6, poly_shift=0.): + + if lat_mpc is None: + lat_mpc = LateralMpc() + lat_mpc.set_weights(1., .1, 0.0, .05, 800) + + y_pts = poly_shift * np.ones(LAT_MPC_N + 1) + heading_pts = np.zeros(LAT_MPC_N + 1) + curv_rate_pts = np.zeros(LAT_MPC_N + 1) + + x0 = np.array([x_init, y_init, psi_init, curvature_init]) + p = np.column_stack([v_ref * np.ones(LAT_MPC_N + 1), + CAR_ROTATION_RADIUS * np.ones(LAT_MPC_N + 1)]) + + # converge in no more than 10 iterations + for _ in range(10): + lat_mpc.run(x0, p, + y_pts, heading_pts, curv_rate_pts) + return lat_mpc.x_sol + + +class TestLateralMpc(unittest.TestCase): + + def _assert_null(self, sol, curvature=1e-6): + for i in range(len(sol)): + self.assertAlmostEqual(sol[0,i,1], 0., delta=curvature) + self.assertAlmostEqual(sol[0,i,2], 0., delta=curvature) + self.assertAlmostEqual(sol[0,i,3], 0., delta=curvature) + + def _assert_simmetry(self, sol, curvature=1e-6): + for i in range(len(sol)): + self.assertAlmostEqual(sol[0,i,1], -sol[1,i,1], delta=curvature) + self.assertAlmostEqual(sol[0,i,2], -sol[1,i,2], delta=curvature) + self.assertAlmostEqual(sol[0,i,3], -sol[1,i,3], delta=curvature) + self.assertAlmostEqual(sol[0,i,0], sol[1,i,0], delta=curvature) + + def test_straight(self): + sol = run_mpc() + self._assert_null(np.array([sol])) + + def test_y_symmetry(self): + sol = [] + for y_init in [-0.5, 0.5]: + sol.append(run_mpc(y_init=y_init)) + self._assert_simmetry(np.array(sol)) + + def test_poly_symmetry(self): + sol = [] + for poly_shift in [-1., 1.]: + sol.append(run_mpc(poly_shift=poly_shift)) + self._assert_simmetry(np.array(sol)) + + def test_curvature_symmetry(self): + sol = [] + for curvature_init in [-0.1, 0.1]: + sol.append(run_mpc(curvature_init=curvature_init)) + self._assert_simmetry(np.array(sol)) + + def test_psi_symmetry(self): + sol = [] + for psi_init in [-0.1, 0.1]: + sol.append(run_mpc(psi_init=psi_init)) + self._assert_simmetry(np.array(sol)) + + def test_no_overshoot(self): + y_init = 1. + sol = run_mpc(y_init=y_init) + for y in list(sol[:,1]): + self.assertGreaterEqual(y_init, abs(y)) + + def test_switch_convergence(self): + lat_mpc = LateralMpc() + sol = run_mpc(lat_mpc=lat_mpc, poly_shift=3.0, v_ref=7.0) + right_psi_deg = np.degrees(sol[:,2]) + sol = run_mpc(lat_mpc=lat_mpc, poly_shift=-3.0, v_ref=7.0) + left_psi_deg = np.degrees(sol[:,2]) + np.testing.assert_almost_equal(right_psi_deg, -left_psi_deg, decimal=3) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_leads.py b/selfdrive/controls/tests/test_leads.py new file mode 100644 index 0000000..268d9c4 --- /dev/null +++ b/selfdrive/controls/tests/test_leads.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +import unittest + +import cereal.messaging as messaging + +from openpilot.selfdrive.test.process_replay import replay_process_with_name +from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA + + +class TestLeads(unittest.TestCase): + def test_radar_fault(self): + # if there's no radar-related can traffic, radard should either not respond or respond with an error + # this is tightly coupled with underlying car radar_interface implementation, but it's a good sanity check + def single_iter_pkg(): + # single iter package, with meaningless cans and empty carState/modelV2 + msgs = [] + for _ in range(5): + can = messaging.new_message("can", 1) + cs = messaging.new_message("carState") + msgs.append(can.as_reader()) + msgs.append(cs.as_reader()) + model = messaging.new_message("modelV2") + msgs.append(model.as_reader()) + + return msgs + + msgs = [m for _ in range(3) for m in single_iter_pkg()] + out = replay_process_with_name("radard", msgs, fingerprint=TOYOTA.COROLLA_TSS2) + states = [m for m in out if m.which() == "radarState"] + failures = [not state.valid and len(state.radarState.radarErrors) for state in states] + + self.assertTrue(len(states) == 0 or all(failures)) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_startup.py b/selfdrive/controls/tests/test_startup.py new file mode 100644 index 0000000..34d14fb --- /dev/null +++ b/selfdrive/controls/tests/test_startup.py @@ -0,0 +1,120 @@ +import os +from parameterized import parameterized + +from cereal import log, car +import cereal.messaging as messaging +from openpilot.common.params import Params +from openpilot.selfdrive.boardd.boardd_api_impl import can_list_to_can_capnp +from openpilot.selfdrive.car.fingerprints import _FINGERPRINTS +from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA +from openpilot.selfdrive.car.mazda.values import CAR as MAZDA +from openpilot.selfdrive.controls.lib.events import EVENT_NAME +from openpilot.selfdrive.manager.process_config import managed_processes + +EventName = car.CarEvent.EventName +Ecu = car.CarParams.Ecu + +COROLLA_FW_VERSIONS = [ + (Ecu.engine, 0x7e0, None, b'\x0230ZC2000\x00\x00\x00\x00\x00\x00\x00\x0050212000\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.abs, 0x7b0, None, b'F152602190\x00\x00\x00\x00\x00\x00'), + (Ecu.eps, 0x7a1, None, b'8965B02181\x00\x00\x00\x00\x00\x00'), + (Ecu.fwdRadar, 0x750, 0xf, b'8821F4702100\x00\x00\x00\x00'), + (Ecu.fwdCamera, 0x750, 0x6d, b'8646F0201101\x00\x00\x00\x00'), + (Ecu.dsu, 0x791, None, b'881510201100\x00\x00\x00\x00'), +] +COROLLA_FW_VERSIONS_FUZZY = COROLLA_FW_VERSIONS[:-1] + [(Ecu.dsu, 0x791, None, b'xxxxxx')] +COROLLA_FW_VERSIONS_NO_DSU = COROLLA_FW_VERSIONS[:-1] + +CX5_FW_VERSIONS = [ + (Ecu.engine, 0x7e0, None, b'PYNF-188K2-F\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.abs, 0x760, None, b'K123-437K2-E\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.eps, 0x730, None, b'KJ01-3210X-G-00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.fwdRadar, 0x764, None, b'K123-67XK2-F\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.fwdCamera, 0x706, None, b'B61L-67XK2-T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.transmission, 0x7e1, None, b'PYNC-21PS1-B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), +] + + +@parameterized.expand([ + # TODO: test EventName.startup for release branches + + # officially supported car + (EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS, "toyota"), + (EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS, "toyota"), + + # dashcamOnly car + (EventName.startupNoControl, MAZDA.CX5, CX5_FW_VERSIONS, "mazda"), + (EventName.startupNoControl, MAZDA.CX5, CX5_FW_VERSIONS, "mazda"), + + # unrecognized car with no fw + (EventName.startupNoFw, None, None, ""), + (EventName.startupNoFw, None, None, ""), + + # unrecognized car + (EventName.startupNoCar, None, COROLLA_FW_VERSIONS[:1], "toyota"), + (EventName.startupNoCar, None, COROLLA_FW_VERSIONS[:1], "toyota"), + + # fuzzy match + (EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS_FUZZY, "toyota"), + (EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS_FUZZY, "toyota"), +]) +def test_startup_alert(expected_event, car_model, fw_versions, brand): + controls_sock = messaging.sub_sock("controlsState") + pm = messaging.PubMaster(['can', 'pandaStates']) + + params = Params() + params.put_bool("OpenpilotEnabledToggle", True) + + # Build capnn version of FW array + if fw_versions is not None: + car_fw = [] + cp = car.CarParams.new_message() + for ecu, addr, subaddress, version in fw_versions: + f = car.CarParams.CarFw.new_message() + f.ecu = ecu + f.address = addr + f.fwVersion = version + f.brand = brand + + if subaddress is not None: + f.subAddress = subaddress + + car_fw.append(f) + cp.carVin = "1" * 17 + cp.carFw = car_fw + params.put("CarParamsCache", cp.to_bytes()) + else: + os.environ['SKIP_FW_QUERY'] = '1' + + managed_processes['controlsd'].start() + + assert pm.wait_for_readers_to_update('can', 5) + pm.send('can', can_list_to_can_capnp([[0, 0, b"", 0]])) + + assert pm.wait_for_readers_to_update('pandaStates', 5) + msg = messaging.new_message('pandaStates', 1) + msg.pandaStates[0].pandaType = log.PandaState.PandaType.uno + pm.send('pandaStates', msg) + + # fingerprint + if (car_model is None) or (fw_versions is not None): + finger = {addr: 1 for addr in range(1, 100)} + else: + finger = _FINGERPRINTS[car_model][0] + + msgs = [[addr, 0, b'\x00'*length, 0] for addr, length in finger.items()] + for _ in range(1000): + # controlsd waits for boardd to echo back that it has changed the multiplexing mode + if not params.get_bool("ObdMultiplexingChanged"): + params.put_bool("ObdMultiplexingChanged", True) + + pm.send('can', can_list_to_can_capnp(msgs)) + assert pm.wait_for_readers_to_update('can', 5, dt=0.001), f"step: {_}" + + ctrls = messaging.drain_sock(controls_sock) + if len(ctrls): + event_name = ctrls[0].controlsState.alertType.split("/")[0] + assert EVENT_NAME[expected_event] == event_name, f"expected {EVENT_NAME[expected_event]} for '{car_model}', got {event_name}" + break + else: + raise Exception(f"failed to fingerprint {car_model}") diff --git a/selfdrive/controls/tests/test_state_machine.py b/selfdrive/controls/tests/test_state_machine.py new file mode 100644 index 0000000..d491117 --- /dev/null +++ b/selfdrive/controls/tests/test_state_machine.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +import unittest + +from cereal import car, log +from openpilot.common.realtime import DT_CTRL +from openpilot.selfdrive.car.car_helpers import interfaces +from openpilot.selfdrive.controls.controlsd import Controls, SOFT_DISABLE_TIME +from openpilot.selfdrive.controls.lib.events import Events, ET, Alert, Priority, AlertSize, AlertStatus, VisualAlert, \ + AudibleAlert, EVENTS +from openpilot.selfdrive.car.mock.values import CAR as MOCK + +State = log.ControlsState.OpenpilotState + +# The event types that maintain the current state +MAINTAIN_STATES = {State.enabled: (None,), State.disabled: (None,), State.softDisabling: (ET.SOFT_DISABLE,), + State.preEnabled: (ET.PRE_ENABLE,), State.overriding: (ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)} +ALL_STATES = tuple(State.schema.enumerants.values()) +# The event types checked in DISABLED section of state machine +ENABLE_EVENT_TYPES = (ET.ENABLE, ET.PRE_ENABLE, ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL) + + +def make_event(event_types): + event = {} + for ev in event_types: + event[ev] = Alert("", "", AlertStatus.normal, AlertSize.small, Priority.LOW, + VisualAlert.none, AudibleAlert.none, 1.) + EVENTS[0] = event + return 0 + + +class TestStateMachine(unittest.TestCase): + + def setUp(self): + CarInterface, CarController, CarState = interfaces[MOCK.MOCK] + CP = CarInterface.get_non_essential_params(MOCK.MOCK) + CI = CarInterface(CP, CarController, CarState) + + self.controlsd = Controls(CI=CI) + self.controlsd.events = Events() + self.controlsd.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL) + self.CS = car.CarState() + + def test_immediate_disable(self): + for state in ALL_STATES: + for et in MAINTAIN_STATES[state]: + self.controlsd.events.add(make_event([et, ET.IMMEDIATE_DISABLE])) + self.controlsd.state = state + self.controlsd.state_transition(self.CS) + self.assertEqual(State.disabled, self.controlsd.state) + self.controlsd.events.clear() + + def test_user_disable(self): + for state in ALL_STATES: + for et in MAINTAIN_STATES[state]: + self.controlsd.events.add(make_event([et, ET.USER_DISABLE])) + self.controlsd.state = state + self.controlsd.state_transition(self.CS) + self.assertEqual(State.disabled, self.controlsd.state) + self.controlsd.events.clear() + + def test_soft_disable(self): + for state in ALL_STATES: + if state == State.preEnabled: # preEnabled considers NO_ENTRY instead + continue + for et in MAINTAIN_STATES[state]: + self.controlsd.events.add(make_event([et, ET.SOFT_DISABLE])) + self.controlsd.state = state + self.controlsd.state_transition(self.CS) + self.assertEqual(self.controlsd.state, State.disabled if state == State.disabled else State.softDisabling) + self.controlsd.events.clear() + + def test_soft_disable_timer(self): + self.controlsd.state = State.enabled + self.controlsd.events.add(make_event([ET.SOFT_DISABLE])) + self.controlsd.state_transition(self.CS) + for _ in range(int(SOFT_DISABLE_TIME / DT_CTRL)): + self.assertEqual(self.controlsd.state, State.softDisabling) + self.controlsd.state_transition(self.CS) + + self.assertEqual(self.controlsd.state, State.disabled) + + def test_no_entry(self): + # Make sure noEntry keeps us disabled + for et in ENABLE_EVENT_TYPES: + self.controlsd.events.add(make_event([ET.NO_ENTRY, et])) + self.controlsd.state_transition(self.CS) + self.assertEqual(self.controlsd.state, State.disabled) + self.controlsd.events.clear() + + def test_no_entry_pre_enable(self): + # preEnabled with noEntry event + self.controlsd.state = State.preEnabled + self.controlsd.events.add(make_event([ET.NO_ENTRY, ET.PRE_ENABLE])) + self.controlsd.state_transition(self.CS) + self.assertEqual(self.controlsd.state, State.preEnabled) + + def test_maintain_states(self): + # Given current state's event type, we should maintain state + for state in ALL_STATES: + for et in MAINTAIN_STATES[state]: + self.controlsd.state = state + self.controlsd.events.add(make_event([et])) + self.controlsd.state_transition(self.CS) + self.assertEqual(self.controlsd.state, state) + self.controlsd.events.clear() + + +if __name__ == "__main__": + unittest.main()