from cereal import car from openpilot.common.conversions import Conversions as CV from openpilot.common.numpy_fast import clip from openpilot.common.realtime import DT_CTRL from opendbc.can.packer import CANPacker from openpilot.selfdrive.car import apply_driver_steer_torque_limits, common_fault_avoidance from openpilot.selfdrive.car.hyundai import hyundaicanfd, hyundaican from openpilot.selfdrive.car.hyundai.hyundaicanfd import CanBus from openpilot.selfdrive.car.hyundai.values import HyundaiFlags, Buttons, CarControllerParams, CANFD_CAR, CAR, LEGACY_SAFETY_MODE_CAR from openpilot.selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX from openpilot.common.params import Params from openpilot.common.watcher import Watcher # DEVELOPOMENT TODO # - Write a loop that runs and does nothing. # - Load as many inputs in possible into state variables. # - Make a state variable tester. It should be a web server running async. # - Make a button tester, also on web server, to test engaging each button. # - Write basic interface refactor # - Write experimental mode speed controller # - Write resume from stop controller # - Implement as many extra features as possible (auto hvac, auto sunroof, suggest a break, remind headlights) VisualAlert = car.CarControl.HUDControl.VisualAlert LongCtrlState = car.CarControl.Actuators.LongControlState params_memory = Params("/dev/shm/params") # EPS faults if you apply torque while the steering angle is above 90 degrees for more than 1 second # All slightly below EPS thresholds to avoid fault MAX_ANGLE = 85 MAX_ANGLE_FRAMES = 89 MAX_ANGLE_CONSECUTIVE_FRAMES = 2 # Constants for state arrays. ENGAGED = 'engaged' STANDBY = 'standby' OFF = 'off' # Input # Current state of openpilot and its desired commands openpilot_state = { 'openpilot_ready': False, # Openpilot is enabled in settings 'lateral_active': False, # Lateral control is engaged either full or always on lateral 'openpilot_engaged': False, # Openpilot is full engaged (lateral + cruise control) 'always_on_lateral_active': False, # Cruise control disengaged but lateral still engaged 'experimental_active': False, # Frogpilot has engaged experimental control, so we need to adjust speed and possibly apply brakes (if this is possible) 'experimental_desired_speed': 0, # The speed experimental control wants to set long to 'curviture_angle': 0, # Detected current curviture (ideally within the next 3 or 4 seconds) for taking over LKAS and possibly LONG 'lane_change_enabled': False, # The lane change assist feature is active 'lane_change_active': False, # Executing a lane change assist 'connected_to_internet': False, 'rate_limited_internet': False } # Input # Esoteric things related to the car that support other features than driving. Will be shown in debugger. car_state = { 'location_lat': 0, 'location_long': 0, 'drive_distance_this_trip': 0, 'drive_distance_today': 0, 'compass_direction': 'N', # Unclear if I will use this 'fuel': 1, # 0-1 fuel tank level 'windows_down': False, # True if any windows / sunroof is open 'climate_control': False, 'climate_control_set_temp': 72, 'cabin_temprature': 50, 'heated_seat_driver_on': False, 'heated_seat_passenger_on': False, 'heated_steering_wheel_on': False, 'fan_seat_driver_on': False, 'fan_seat_passenger_on': False, 'info_panel_item_showing': 0, # If we can capture what instrument panel info is showing, we can change it to speed 'daytime': False, 'headlights': False, # Would be cool to remind to have the headlights turned on if moving } # Input # The current status of cruise control activation cruise_control_state = { 'set_speed': 0, # Cruise control set speed 'actual_speed': 0, # Spedometer reading 'speed_limit': 0, # Speed limit as reported by car's dashboard 'cruise_control_status': OFF, # or ENGAGED or STANDBY 'brake_pressed': False } # Input # The current state of the car in front of us, if cruise control is engaged but the car is not moving stationary_state = { 'stationary': False, # When speed reaches 0 we are stationary 'stationary_since': 0, # At what time stationary was achieved 'stationary_has_lead': False, # At the time of becoming stationary, was there a lead veichele? 'stationary_has_lead_distance': 0, # At the time of becoming stationary, what was the distance to the lead veichele? 'current_lead_car_distance': 0, # What is the distance to the lead veichele now? 'lead_vehicle_moving_away': False, # Has logic decided that the current veichele is moving away? 'oem_lanes_detected': False # Is the car reporting it can see lanes? I would like to experiment with engaging stock LKAS. } # Input # What buttons the user is pressing. Can be None, BUTTON_SHORT, or BUTTON_LONG (held for 1/2 second) cruise_control_buttons_input = { 'engage_cruise_control': None, 'cruise_control_speed_up': None, 'cruise_control_speed_down': None, 'pause_resume_cruise_control': None, 'lane_keep_assist_button': None, 'lane_follow_assist_button': None } # Output # What buttons we wish to simulate pressing. cruise_control_buttons_output = { 'engage_cruise_control': False, 'cruise_control_speed_up': False, 'cruise_control_speed_down': False, 'pause_resume_cruise_control': False, 'lane_keep_assist_button': False, 'lane_follow_assist_button': False, } # Output # What buttons we wish to simulate pressing. # I don't know if I will have access to these. other_buttons_output = { 'info_category': False, 'info_section': False, 'heated_seat_driver': False, 'heated_seat_passenger': False, 'heated_steering_wheel': False, 'fan_seat_driver': False, 'fan_seat_passenger': False, 'sunroof': False, 'hvac_power': False, 'hvac_sync': False, 'hvac_up': False, 'hvac_down': False } # What to display on the instrument panel. Sent every frame. instrument_panel_outputs = { 'lane_keeping_assist_active': False, 'forward_collision_warning_active': False, 'high_beam_assist_active': False, 'forward_collision_avoidance_assist_active': False, 'blind_spot_collision_warning_active': False, 'lane_following_assist_active': False, 'driver_attention_warning_active': False, 'lane_departure_warning_active': False } # State # Variables to save what oscarpilot is doing - behaviors will change depending # on current state oscarpilot_state = [ 'set_speed': 0, # Current desired cruise speed 'speed_override': False, # Set if moving away from set speed due to experimental 'speed_restore': False, # Set if moving twoards set speed due to reengagement or change speed to speed limit 'lkas_oem': False, # Set if allowing the car oem software to control LKAS 'daytime': False, # Set if it is daytime. Car is more judgmental at night. ] # Output # These are messages to the user, and get set to false when processed by openpilot # Ideally we can set these in the infotainment area as well, like carplay audio selection oscarpilot_alerts = { 'detected_stop_sign_or_light': False, # Openpilot has seen a stoplight or stopsign 'very_sharp_curve': False, # The upcoming curve is probably too sharp to be handled automatically 'lane_confidence_low': False, # The confidence level that openpilot knows what its doing is low 'heavy_traffic_ahead': False, # Mapping data indicates heavy traffic within 2 miles 'weather_ahead': False, # Mapping data indicates precipitation within 5 miles 'suggest_a_break': False, # Variable will be used to suggest a break (3 hours at day, 1.5 hours at night) } # Where is this called? Public? Private? Should we refactor? def process_hud_alert(enabled, fingerprint, hud_control): sys_warning = (hud_control.visualAlert in (VisualAlert.steerRequired, VisualAlert.ldw)) # initialize to no line visible # TODO: this is not accurate for all cars sys_state = 1 if hud_control.leftLaneVisible and hud_control.rightLaneVisible or sys_warning: # HUD alert only display when LKAS status is active sys_state = 3 if enabled or sys_warning else 4 elif hud_control.leftLaneVisible: sys_state = 5 elif hud_control.rightLaneVisible: sys_state = 6 # initialize to no warnings left_lane_warning = 0 right_lane_warning = 0 if hud_control.leftLaneDepart: left_lane_warning = 1 if fingerprint in (CAR.GENESIS_G90, CAR.GENESIS_G80) else 2 if hud_control.rightLaneDepart: right_lane_warning = 1 if fingerprint in (CAR.GENESIS_G90, CAR.GENESIS_G80) else 2 return sys_warning, sys_state, left_lane_warning, right_lane_warning class CarController: def __init__(self, dbc_name, CP, VM): self.CP = CP self.CAN = CanBus(CP) self.params = CarControllerParams(CP) self.packer = CANPacker(dbc_name) self.angle_limit_counter = 0 self.frame = 0 self.accel_last = 0 self.apply_steer_last = 0 self.car_fingerprint = CP.carFingerprint self.last_button_frame = 0 self.last_resume_frame = 0 self.last_debug_frame = 0 self._CC = None # self._CC.actuators # self._CC.hudControl self._CS = None self._now_nanos = None self._sport_plus = None self._actuators = None self._hud_control = None def update(self, CC, CS, now_nanos, sport_plus): self._CC = CC self._CS = CS self._now_nanos = now_nanos self._sport_plus = sport_plus self._apply_steer = None self._accel = None can_sends = [] # CAN-FD car logic if self.CP.carFingerprint in CANFD_CAR: # Steering control for CAN FD # can_sends.extend(self._create_common_messages()) can_sends.extend(self._create_steering_messages()) can_sends.extend(self._create_instrument_messages()) can_sends.extend(self._create_button_messages()) new_actuators = actuators.copy() new_actuators.steer = apply_steer / self.params.STEER_MAX new_actuators.steerOutputCan = apply_steer new_actuators.accel = accel self.frame += 1 return new_actuators, can_sends # def _create_common_messages(): # *** common hyundai stuff *** def _unprocessed(): # HUD messages sys_warning, sys_state, left_lane_warning, right_lane_warning = process_hud_alert(CC.enabled, self.car_fingerprint, hud_control) can_sends = [] # CAN-FD platforms # if self.CP.carFingerprint in CANFD_CAR: # # LFA and HDA icons # if self.frame % 5 == 0 and (not hda2 or hda2_long): # can_sends.append(hyundaicanfd.create_lfahda_cluster(self.packer, self.CAN, CC.enabled)) # if self.CP.openpilotLongitudinalControl: # if hda2: # can_sends.extend(hyundaicanfd.create_adrv_messages(self.packer, self.CAN, self.frame)) # if self.frame % 2 == 0: # can_sends.append(hyundaicanfd.create_acc_control(self.packer, self.CAN, CC.enabled, self.accel_last, accel, stopping, CC.cruiseControl.override, # set_speed_in_units, CS.personality_profile)) # self.accel_last = accel # else: # # button presses # can_sends.extend(self.create_button_messages(CC, CS, use_clu11=False)) # Mostly unchanged from frogpilot. def _create_steering_messages(): can_sends = [] # steering torque new_steer = int(round(actuators.steer * self.params.STEER_MAX)) apply_steer = apply_driver_steer_torque_limits(new_steer, self.apply_steer_last, self._CS.out.steeringTorque, self.params) # >90 degree steering fault prevention self.angle_limit_counter, apply_steer_req = common_fault_avoidance(abs(self._CS.out.steeringAngleDeg) >= MAX_ANGLE, self._CC.latActive, self.angle_limit_counter, MAX_ANGLE_FRAMES, MAX_ANGLE_CONSECUTIVE_FRAMES) if not CC.latActive: apply_steer = 0 # Hold torque with induced temporary fault when cutting the actuation bit torque_fault = self._CC.latActive and not apply_steer_req self.apply_steer_last = apply_steer # accel + longitudinal if sport_plus: accel = clip(actuators.accel, CarControllerParams.ACCEL_MIN, CarControllerParams.ACCEL_MAX_PLUS) else: accel = clip(actuators.accel, CarControllerParams.ACCEL_MIN, CarControllerParams.ACCEL_MAX) stopping = actuators.longControlState == LongCtrlState.stopping set_speed_in_units = hud_control.setSpeed * (CV.MS_TO_KPH if CS.is_metric else CV.MS_TO_MPH) hda2 = self.CP.flags & HyundaiFlags.CANFD_HDA2 # steering control can_sends.extend(hyundaicanfd.create_steering_messages(self.packer, self.CP, self.CAN, CC.enabled, apply_steer_req, apply_steer)) # prevent LFA from activating on HDA2 by sending "no lane lines detected" to ADAS ECU if self.frame % 5 == 0 and hda2: can_sends.append(hyundaicanfd.create_suppress_lfa(self.packer, self.CAN, CS.hda2_lfa_block_msg, self.CP.flags & HyundaiFlags.CANFD_HDA2_ALT_STEERING)) # blinkers # if hda2 and self.CP.flags & HyundaiFlags.ENABLE_BLINKERS: # can_sends.extend(hyundaicanfd.create_spas_messages(self.packer, self.CAN, self.frame, CC.leftBlinker, CC.rightBlinker)) self._apply_steer = apply_steer self._accel = accel return can_sends # Placeholder for potential instrument panel processing def _create_instrument_messages(instrument_panel_state): can_sends = [] # Assume default values or read the initial state lfa_icon_state = 0 hda_active = 0 hda_icon_state = 0 hda_set_speed = 0 # This would likely need to be more dynamic based on context # Assign values based on instrument_panel_outputs states for output in instrument_panel_outputs: if output['name'] == 'lane_keeping_assist_active': lfa_icon_state = 2 if output['value'] else 0 elif output['name'] == 'hda_active': # Hypothetical key for example hda_active = 1 if output['value'] else 0 elif output['name'] == 'hda_icon_state': # Also hypothetical hda_icon_state = 2 if output['value'] else 0 # Add other conditions based on the actual data structure you expect # Assume we can bundle all these states into one message as in your example values = { "LFA_Icon_State": lfa_icon_state, "HDA_Active": hda_active, "HDA_Icon_State": hda_icon_state, "HDA_VSetReq": hda_set_speed, # This example assumes static speed; adjust if dynamic } can_sends.append(packer.make_can_msg("LFAHDA_MFC", 0, values)) return can_sends # Placeholder for potential button press processing def _create_button_messages(button_presses): can_sends = [] for button in cruise_control_buttons: if button['value']: # Resetting button state to prevent repeated presses button['value'] = False if button['name'] in button_map: # Mapping each button to a specific CAN message btn_value = button_map[button['name']] can_sends.append(create_buttons(packer, CP, CAN, 0, btn_value)) # Counter is example, replace with actual logic if necessary return can_sends