Added toggle to use turn desires when below the minimum lane change speed for more precise turns.
214 lines
10 KiB
Python
214 lines
10 KiB
Python
import cereal.messaging as messaging
|
|
import numpy as np
|
|
|
|
from openpilot.common.conversions import Conversions as CV
|
|
from openpilot.common.numpy_fast import clip, interp
|
|
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, V_CRUISE_MAX
|
|
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC
|
|
from openpilot.selfdrive.controls.lib.longitudinal_planner import A_CRUISE_MIN, A_CRUISE_MAX_VALS, A_CRUISE_MAX_BP, get_max_accel
|
|
from openpilot.selfdrive.modeld.constants import ModelConstants
|
|
|
|
from openpilot.selfdrive.frogpilot.functions.conditional_experimental_mode import ConditionalExperimentalMode
|
|
from openpilot.selfdrive.frogpilot.functions.map_turn_speed_controller import MapTurnSpeedController
|
|
from openpilot.selfdrive.frogpilot.functions.speed_limit_controller import SpeedLimitController
|
|
|
|
# Acceleration profiles - Credit goes to the DragonPilot team!
|
|
# MPH = [0., 35, 35, 40, 40, 45, 45, 67, 67, 67, 123]
|
|
A_CRUISE_MIN_BP_CUSTOM = [0., 2.0, 2.01, 11., 11.01, 18., 18.01, 28., 28.01, 33., 55.]
|
|
# MPH = [0., 6.71, 13.4, 17.9, 24.6, 33.6, 44.7, 55.9, 67.1, 123]
|
|
A_CRUISE_MAX_BP_CUSTOM = [0., 3, 6., 8., 11., 15., 20., 25., 30., 55.]
|
|
|
|
A_CRUISE_MIN_VALS_ECO_TUNE = [-0.480, -0.480, -0.40, -0.40, -0.40, -0.36, -0.32, -0.28, -0.28, -0.25, -0.25]
|
|
A_CRUISE_MAX_VALS_ECO_TUNE = [3.5, 3.3, 1.7, 1.1, .76, .62, .47, .36, .28, .09]
|
|
|
|
A_CRUISE_MIN_VALS_SPORT_TUNE = [-0.500, -0.500, -0.42, -0.42, -0.42, -0.42, -0.40, -0.35, -0.35, -0.30, -0.30]
|
|
A_CRUISE_MAX_VALS_SPORT_TUNE = [3.5, 3.5, 3.0, 2.6, 1.4, 1.0, 0.7, 0.6, .38, .2]
|
|
|
|
def get_min_accel_eco_tune(v_ego):
|
|
return interp(v_ego, A_CRUISE_MIN_BP_CUSTOM, A_CRUISE_MIN_VALS_ECO_TUNE)
|
|
|
|
def get_max_accel_eco_tune(v_ego):
|
|
return interp(v_ego, A_CRUISE_MAX_BP_CUSTOM, A_CRUISE_MAX_VALS_ECO_TUNE)
|
|
|
|
def get_min_accel_sport_tune(v_ego):
|
|
return interp(v_ego, A_CRUISE_MIN_BP_CUSTOM, A_CRUISE_MIN_VALS_SPORT_TUNE)
|
|
|
|
def get_max_accel_sport_tune(v_ego):
|
|
return interp(v_ego, A_CRUISE_MAX_BP_CUSTOM, A_CRUISE_MAX_VALS_SPORT_TUNE)
|
|
|
|
def calculate_lane_width(lane, current_lane, road_edge):
|
|
lane_x, lane_y = np.array(lane.x), np.array(lane.y)
|
|
edge_x, edge_y = np.array(road_edge.x), np.array(road_edge.y)
|
|
current_x, current_y = np.array(current_lane.x), np.array(current_lane.y)
|
|
|
|
lane_y_interp = np.interp(current_x, lane_x[lane_x.argsort()], lane_y[lane_x.argsort()])
|
|
road_edge_y_interp = np.interp(current_x, edge_x[edge_x.argsort()], edge_y[edge_x.argsort()])
|
|
|
|
distance_to_lane = np.mean(np.abs(current_y - lane_y_interp))
|
|
distance_to_road_edge = np.mean(np.abs(current_y - road_edge_y_interp))
|
|
|
|
return min(distance_to_lane, distance_to_road_edge)
|
|
|
|
|
|
class FrogPilotPlanner:
|
|
def __init__(self, params):
|
|
self.cem = ConditionalExperimentalMode()
|
|
self.mtsc = MapTurnSpeedController()
|
|
|
|
self.override_slc = False
|
|
|
|
self.overridden_speed = 0
|
|
self.mtsc_target = 0
|
|
self.slc_target = 0
|
|
self.v_cruise = 0
|
|
|
|
self.x_desired_trajectory = np.zeros(CONTROL_N)
|
|
|
|
self.update_frogpilot_params(params)
|
|
|
|
def update(self, sm, mpc):
|
|
carState, controlsState, modelData = sm['carState'], sm['controlsState'], sm['modelV2']
|
|
|
|
enabled = controlsState.enabled
|
|
|
|
v_cruise_kph = min(controlsState.vCruise, V_CRUISE_MAX)
|
|
v_cruise = v_cruise_kph * CV.KPH_TO_MS
|
|
v_ego = carState.vEgo
|
|
|
|
# Acceleration profiles
|
|
v_cruise_changed = (self.mtsc_target) + 1 < v_cruise # Use stock acceleration profiles to handle MTSC more precisely
|
|
if v_cruise_changed:
|
|
self.accel_limits = [A_CRUISE_MIN, get_max_accel(v_ego)]
|
|
elif self.acceleration_profile == 1:
|
|
self.accel_limits = [get_min_accel_eco_tune(v_ego), get_max_accel_eco_tune(v_ego)]
|
|
elif self.acceleration_profile in (2, 3):
|
|
self.accel_limits = [get_min_accel_sport_tune(v_ego), get_max_accel_sport_tune(v_ego)]
|
|
else:
|
|
self.accel_limits = [A_CRUISE_MIN, get_max_accel(v_ego)]
|
|
|
|
# Conditional Experimental Mode
|
|
if self.conditional_experimental_mode and enabled:
|
|
self.cem.update(carState, sm['frogpilotNavigation'], sm['modelV2'], mpc, sm['radarState'], carState.standstill, v_ego)
|
|
|
|
self.v_cruise = self.update_v_cruise(carState, controlsState, modelData, enabled, v_cruise, v_ego)
|
|
|
|
self.x_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, mpc.x_solution)
|
|
self.x_desired_trajectory = self.x_desired_trajectory_full[:CONTROL_N]
|
|
|
|
def update_v_cruise(self, carState, controlsState, modelData, enabled, v_cruise, v_ego):
|
|
# Pfeiferj's Map Turn Speed Controller
|
|
if self.map_turn_speed_controller:
|
|
self.mtsc_target = np.clip(self.mtsc.target_speed(v_ego, carState.aEgo), MIN_TARGET_V, v_cruise)
|
|
if self.mtsc_target == MIN_TARGET_V:
|
|
self.mtsc_target = v_cruise
|
|
else:
|
|
self.mtsc_target = v_cruise
|
|
|
|
# Pfeiferj's Speed Limit Controller
|
|
if self.speed_limit_controller:
|
|
SpeedLimitController.update_current_max_velocity(v_cruise)
|
|
self.slc_target = SpeedLimitController.desired_speed_limit
|
|
|
|
# Override SLC upon gas pedal press and reset upon brake/cancel button
|
|
if self.speed_limit_controller_override:
|
|
self.override_slc |= carState.gasPressed
|
|
self.override_slc &= enabled
|
|
self.override_slc &= v_ego > self.slc_target
|
|
else:
|
|
self.override_slc = False
|
|
|
|
self.overridden_speed *= enabled
|
|
|
|
# Use the override speed if SLC is being overridden
|
|
if self.override_slc:
|
|
if self.speed_limit_controller_override == 1:
|
|
# Set the max speed to the manual set speed
|
|
if carState.gasPressed:
|
|
self.overridden_speed = np.clip(v_ego, self.slc_target, v_cruise)
|
|
self.slc_target = self.overridden_speed
|
|
elif self.speed_limit_controller_override == 2:
|
|
self.overridden_speed = v_cruise
|
|
self.slc_target = v_cruise
|
|
if self.slc_target == 0:
|
|
self.slc_target = v_cruise
|
|
else:
|
|
self.overriden_speed = 0
|
|
self.slc_target = v_cruise
|
|
|
|
v_ego_diff = max(carState.vEgoRaw - carState.vEgoCluster, 0)
|
|
return min(v_cruise, self.mtsc_target, self.slc_target) - v_ego_diff
|
|
|
|
def publish_lateral(self, sm, pm, DH):
|
|
frogpilot_lateral_plan_send = messaging.new_message('frogpilotLateralPlan')
|
|
frogpilot_lateral_plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState', 'modelV2'])
|
|
frogpilotLateralPlan = frogpilot_lateral_plan_send.frogpilotLateralPlan
|
|
|
|
frogpilotLateralPlan.laneWidthLeft = float(DH.lane_width_left)
|
|
frogpilotLateralPlan.laneWidthRight = float(DH.lane_width_right)
|
|
|
|
pm.send('frogpilotLateralPlan', frogpilot_lateral_plan_send)
|
|
|
|
def publish_longitudinal(self, sm, pm, mpc):
|
|
frogpilot_longitudinal_plan_send = messaging.new_message('frogpilotLongitudinalPlan')
|
|
frogpilot_longitudinal_plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
|
|
frogpilotLongitudinalPlan = frogpilot_longitudinal_plan_send.frogpilotLongitudinalPlan
|
|
|
|
frogpilotLongitudinalPlan.adjustedCruise = float(min(self.mtsc_target) * (CV.MS_TO_KPH if self.is_metric else CV.MS_TO_MPH))
|
|
frogpilotLongitudinalPlan.conditionalExperimental = self.cem.experimental_mode
|
|
frogpilotLongitudinalPlan.distances = self.x_desired_trajectory.tolist()
|
|
frogpilotLongitudinalPlan.redLight = bool(self.cem.red_light_detected)
|
|
|
|
frogpilotLongitudinalPlan.safeObstacleDistance = mpc.safe_obstacle_distance
|
|
frogpilotLongitudinalPlan.stoppedEquivalenceFactor = mpc.stopped_equivalence_factor
|
|
frogpilotLongitudinalPlan.desiredFollowDistance = mpc.safe_obstacle_distance - mpc.stopped_equivalence_factor
|
|
frogpilotLongitudinalPlan.safeObstacleDistanceStock = mpc.safe_obstacle_distance_stock
|
|
|
|
frogpilotLongitudinalPlan.slcOverridden = self.override_slc
|
|
frogpilotLongitudinalPlan.slcOverriddenSpeed = float(self.overridden_speed)
|
|
frogpilotLongitudinalPlan.slcSpeedLimit = float(self.slc_target)
|
|
frogpilotLongitudinalPlan.slcSpeedLimitOffset = float(SpeedLimitController.offset)
|
|
|
|
pm.send('frogpilotLongitudinalPlan', frogpilot_longitudinal_plan_send)
|
|
|
|
def update_frogpilot_params(self, params):
|
|
self.is_metric = params.get_bool("IsMetric")
|
|
|
|
self.blindspot_path = params.get_bool("CustomUI") and params.get_bool("BlindSpotPath")
|
|
|
|
self.conditional_experimental_mode = params.get_bool("ConditionalExperimental")
|
|
if self.conditional_experimental_mode:
|
|
self.cem.update_frogpilot_params(self.is_metric, params)
|
|
if not params.get_bool("ExperimentalMode"):
|
|
params.put_bool("ExperimentalMode", True)
|
|
|
|
self.custom_personalities = params.get_bool("CustomPersonalities")
|
|
self.aggressive_follow = params.get_int("AggressiveFollow") / 10
|
|
self.standard_follow = params.get_int("StandardFollow") / 10
|
|
self.relaxed_follow = params.get_int("RelaxedFollow") / 10
|
|
self.aggressive_jerk = params.get_int("AggressiveJerk") / 10
|
|
self.standard_jerk = params.get_int("StandardJerk") / 10
|
|
self.relaxed_jerk = params.get_int("RelaxedJerk") / 10
|
|
|
|
lateral_tune = params.get_bool("LateralTune")
|
|
self.average_desired_curvature = params.get_bool("AverageCurvature") and lateral_tune
|
|
|
|
longitudinal_tune = params.get_bool("LongitudinalTune")
|
|
self.acceleration_profile = params.get_int("AccelerationProfile") if longitudinal_tune else 0
|
|
self.aggressive_acceleration = params.get_bool("AggressiveAcceleration") and longitudinal_tune
|
|
self.increased_stopping_distance = params.get_int("StoppingDistance") * (1 if self.is_metric else CV.FOOT_TO_METER) if longitudinal_tune else 0
|
|
self.smoother_braking = params.get_bool("SmoothBraking") and longitudinal_tune
|
|
|
|
self.map_turn_speed_controller = params.get_bool("MTSCEnabled")
|
|
|
|
self.nudgeless = params.get_bool("NudgelessLaneChange")
|
|
self.lane_change_delay = params.get_int("LaneChangeTime") if self.nudgeless else 0
|
|
self.lane_detection = params.get_bool("LaneDetection") if self.nudgeless else False
|
|
self.one_lane_change = params.get_bool("OneLaneChange") if self.nudgeless else False
|
|
|
|
self.speed_limit_controller = params.get_bool("SpeedLimitController")
|
|
if self.speed_limit_controller:
|
|
self.speed_limit_controller_override = params.get_int("SLCOverride")
|
|
SpeedLimitController.update_frogpilot_params()
|
|
|
|
self.turn_desires = params.get_bool("TurnDesires")
|