Added toggles to customize the lane lines, path, road edges, path edges, show the acceleration/deceleration on the path, lead info, driving logics, adjacent lanes, blind spot path, fps tracker, and an "Unlimited Length" mode that extends the road UI out as far as the model can see.
141 lines
6.7 KiB
Python
141 lines
6.7 KiB
Python
import cereal.messaging as messaging
|
|
import numpy as np
|
|
|
|
from openpilot.common.conversions import Conversions as CV
|
|
from openpilot.common.numpy_fast import interp
|
|
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, V_CRUISE_MAX
|
|
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC
|
|
from openpilot.selfdrive.controls.lib.longitudinal_planner import A_CRUISE_MIN, A_CRUISE_MAX_VALS, A_CRUISE_MAX_BP, get_max_accel
|
|
from openpilot.selfdrive.modeld.constants import ModelConstants
|
|
|
|
from openpilot.selfdrive.frogpilot.functions.conditional_experimental_mode import ConditionalExperimentalMode
|
|
|
|
# Acceleration profiles - Credit goes to the DragonPilot team!
|
|
# MPH = [0., 35, 35, 40, 40, 45, 45, 67, 67, 67, 123]
|
|
A_CRUISE_MIN_BP_CUSTOM = [0., 2.0, 2.01, 11., 11.01, 18., 18.01, 28., 28.01, 33., 55.]
|
|
# MPH = [0., 6.71, 13.4, 17.9, 24.6, 33.6, 44.7, 55.9, 67.1, 123]
|
|
A_CRUISE_MAX_BP_CUSTOM = [0., 3, 6., 8., 11., 15., 20., 25., 30., 55.]
|
|
|
|
A_CRUISE_MIN_VALS_ECO_TUNE = [-0.480, -0.480, -0.40, -0.40, -0.40, -0.36, -0.32, -0.28, -0.28, -0.25, -0.25]
|
|
A_CRUISE_MAX_VALS_ECO_TUNE = [3.5, 3.3, 1.7, 1.1, .76, .62, .47, .36, .28, .09]
|
|
|
|
A_CRUISE_MIN_VALS_SPORT_TUNE = [-0.500, -0.500, -0.42, -0.42, -0.42, -0.42, -0.40, -0.35, -0.35, -0.30, -0.30]
|
|
A_CRUISE_MAX_VALS_SPORT_TUNE = [3.5, 3.5, 3.0, 2.6, 1.4, 1.0, 0.7, 0.6, .38, .2]
|
|
|
|
def get_min_accel_eco_tune(v_ego):
|
|
return interp(v_ego, A_CRUISE_MIN_BP_CUSTOM, A_CRUISE_MIN_VALS_ECO_TUNE)
|
|
|
|
def get_max_accel_eco_tune(v_ego):
|
|
return interp(v_ego, A_CRUISE_MAX_BP_CUSTOM, A_CRUISE_MAX_VALS_ECO_TUNE)
|
|
|
|
def get_min_accel_sport_tune(v_ego):
|
|
return interp(v_ego, A_CRUISE_MIN_BP_CUSTOM, A_CRUISE_MIN_VALS_SPORT_TUNE)
|
|
|
|
def get_max_accel_sport_tune(v_ego):
|
|
return interp(v_ego, A_CRUISE_MAX_BP_CUSTOM, A_CRUISE_MAX_VALS_SPORT_TUNE)
|
|
|
|
def calculate_lane_width(lane, current_lane, road_edge):
|
|
lane_x, lane_y = np.array(lane.x), np.array(lane.y)
|
|
edge_x, edge_y = np.array(road_edge.x), np.array(road_edge.y)
|
|
current_x, current_y = np.array(current_lane.x), np.array(current_lane.y)
|
|
|
|
lane_y_interp = np.interp(current_x, lane_x[lane_x.argsort()], lane_y[lane_x.argsort()])
|
|
road_edge_y_interp = np.interp(current_x, edge_x[edge_x.argsort()], edge_y[edge_x.argsort()])
|
|
|
|
distance_to_lane = np.mean(np.abs(current_y - lane_y_interp))
|
|
distance_to_road_edge = np.mean(np.abs(current_y - road_edge_y_interp))
|
|
|
|
return min(distance_to_lane, distance_to_road_edge)
|
|
|
|
|
|
class FrogPilotPlanner:
|
|
def __init__(self, params):
|
|
self.cem = ConditionalExperimentalMode()
|
|
|
|
self.v_cruise = 0
|
|
|
|
self.x_desired_trajectory = np.zeros(CONTROL_N)
|
|
|
|
self.update_frogpilot_params(params)
|
|
|
|
def update(self, sm, mpc):
|
|
carState, controlsState, modelData = sm['carState'], sm['controlsState'], sm['modelV2']
|
|
|
|
enabled = controlsState.enabled
|
|
|
|
v_cruise_kph = min(controlsState.vCruise, V_CRUISE_MAX)
|
|
v_cruise = v_cruise_kph * CV.KPH_TO_MS
|
|
v_ego = carState.vEgo
|
|
|
|
# Acceleration profiles
|
|
if self.acceleration_profile == 1:
|
|
self.accel_limits = [get_min_accel_eco_tune(v_ego), get_max_accel_eco_tune(v_ego)]
|
|
elif self.acceleration_profile in (2, 3):
|
|
self.accel_limits = [get_min_accel_sport_tune(v_ego), get_max_accel_sport_tune(v_ego)]
|
|
else:
|
|
self.accel_limits = [A_CRUISE_MIN, get_max_accel(v_ego)]
|
|
|
|
# Conditional Experimental Mode
|
|
if self.conditional_experimental_mode and enabled:
|
|
self.cem.update(carState, sm['frogpilotNavigation'], sm['modelV2'], mpc, sm['radarState'], carState.standstill, v_ego)
|
|
|
|
self.v_cruise = self.update_v_cruise(carState, controlsState, modelData, enabled, v_cruise, v_ego)
|
|
|
|
self.x_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, mpc.x_solution)
|
|
self.x_desired_trajectory = self.x_desired_trajectory_full[:CONTROL_N]
|
|
|
|
def update_v_cruise(self, carState, controlsState, modelData, enabled, v_cruise, v_ego):
|
|
v_ego_diff = max(carState.vEgoRaw - carState.vEgoCluster, 0)
|
|
return v_cruise - v_ego_diff
|
|
|
|
def publish_lateral(self, sm, pm, DH):
|
|
frogpilot_lateral_plan_send = messaging.new_message('frogpilotLateralPlan')
|
|
frogpilot_lateral_plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState', 'modelV2'])
|
|
frogpilotLateralPlan = frogpilot_lateral_plan_send.frogpilotLateralPlan
|
|
|
|
frogpilotLateralPlan.laneWidthLeft = float(DH.lane_width_left)
|
|
frogpilotLateralPlan.laneWidthRight = float(DH.lane_width_right)
|
|
|
|
pm.send('frogpilotLateralPlan', frogpilot_lateral_plan_send)
|
|
|
|
def publish_longitudinal(self, sm, pm, mpc):
|
|
frogpilot_longitudinal_plan_send = messaging.new_message('frogpilotLongitudinalPlan')
|
|
frogpilot_longitudinal_plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
|
|
frogpilotLongitudinalPlan = frogpilot_longitudinal_plan_send.frogpilotLongitudinalPlan
|
|
|
|
frogpilotLongitudinalPlan.conditionalExperimental = self.cem.experimental_mode
|
|
frogpilotLongitudinalPlan.distances = self.x_desired_trajectory.tolist()
|
|
|
|
frogpilotLongitudinalPlan.safeObstacleDistance = mpc.safe_obstacle_distance
|
|
frogpilotLongitudinalPlan.stoppedEquivalenceFactor = mpc.stopped_equivalence_factor
|
|
frogpilotLongitudinalPlan.desiredFollowDistance = mpc.safe_obstacle_distance - mpc.stopped_equivalence_factor
|
|
frogpilotLongitudinalPlan.safeObstacleDistanceStock = mpc.safe_obstacle_distance_stock
|
|
|
|
pm.send('frogpilotLongitudinalPlan', frogpilot_longitudinal_plan_send)
|
|
|
|
def update_frogpilot_params(self, params):
|
|
self.is_metric = params.get_bool("IsMetric")
|
|
|
|
self.blindspot_path = params.get_bool("CustomUI") and params.get_bool("BlindSpotPath")
|
|
|
|
self.conditional_experimental_mode = params.get_bool("ConditionalExperimental")
|
|
if self.conditional_experimental_mode:
|
|
self.cem.update_frogpilot_params(self.is_metric, params)
|
|
if not params.get_bool("ExperimentalMode"):
|
|
params.put_bool("ExperimentalMode", True)
|
|
|
|
self.custom_personalities = params.get_bool("CustomPersonalities")
|
|
self.aggressive_follow = params.get_int("AggressiveFollow") / 10
|
|
self.standard_follow = params.get_int("StandardFollow") / 10
|
|
self.relaxed_follow = params.get_int("RelaxedFollow") / 10
|
|
self.aggressive_jerk = params.get_int("AggressiveJerk") / 10
|
|
self.standard_jerk = params.get_int("StandardJerk") / 10
|
|
self.relaxed_jerk = params.get_int("RelaxedJerk") / 10
|
|
|
|
lateral_tune = params.get_bool("LateralTune")
|
|
self.average_desired_curvature = params.get_bool("AverageCurvature") and lateral_tune
|
|
|
|
longitudinal_tune = params.get_bool("LongitudinalTune")
|
|
self.acceleration_profile = params.get_int("AccelerationProfile") if longitudinal_tune else 0
|
|
self.aggressive_acceleration = params.get_bool("AggressiveAcceleration") and longitudinal_tune
|