Conditional Experimental Mode
Added toggles for "Conditional Experimental Mode". Conditions based on road curvature, turn signals, speed, lead speed, navigation instructions, and stop signs/stop lights are all individually toggleable. Co-Authored-By: eFini <16603033+efinilan@users.noreply.github.com> Co-Authored-By: Kumar <36933347+rav4kumar@users.noreply.github.com>
This commit is contained in:
191
selfdrive/frogpilot/functions/conditional_experimental_mode.py
Normal file
191
selfdrive/frogpilot/functions/conditional_experimental_mode.py
Normal file
@@ -0,0 +1,191 @@
|
||||
from openpilot.common.conversions import Conversions as CV
|
||||
from openpilot.common.numpy_fast import interp
|
||||
|
||||
from openpilot.selfdrive.frogpilot.functions.frogpilot_functions import CITY_SPEED_LIMIT, CRUISING_SPEED, MovingAverageCalculator, PROBABILITY
|
||||
|
||||
|
||||
# Lookup table for stop sign / stop light detection
|
||||
SLOW_DOWN_BP = [0., 10., 20., 30., 40., 50., 55., 60.]
|
||||
SLOW_DOWN_DISTANCE = [20, 30., 50., 70., 80., 90., 105., 120.]
|
||||
TRAJECTORY_SIZE = 33
|
||||
|
||||
|
||||
class ConditionalExperimentalMode:
|
||||
def __init__(self, params_memory):
|
||||
self.params_memory = params_memory
|
||||
|
||||
self.curve_detected = False
|
||||
self.experimental_mode = False
|
||||
self.lead_detected = False
|
||||
self.lead_stopping = False
|
||||
self.red_light_detected = False
|
||||
self.slower_lead_detected = False
|
||||
|
||||
self.previous_status_value = 0
|
||||
self.previous_v_ego = 0
|
||||
self.previous_v_lead = 0
|
||||
self.status_value = 0
|
||||
|
||||
self.curvature_mac = MovingAverageCalculator()
|
||||
self.lead_detection_mac = MovingAverageCalculator()
|
||||
self.lead_slowing_down_mac = MovingAverageCalculator()
|
||||
self.slow_lead_mac = MovingAverageCalculator()
|
||||
self.slowing_down_mac = MovingAverageCalculator()
|
||||
self.stop_light_mac = MovingAverageCalculator()
|
||||
|
||||
def update(self, carState, enabled, frogpilotNavigation, modelData, radarState, road_curvature, stop_distance, t_follow, v_ego):
|
||||
lead = radarState.leadOne
|
||||
v_lead = lead.vLead
|
||||
|
||||
# Update Experimental Mode based on the current driving conditions
|
||||
condition_met = self.check_conditions(carState, frogpilotNavigation, lead, modelData, stop_distance, v_ego)
|
||||
if (not self.experimental_mode and condition_met) and enabled:
|
||||
self.experimental_mode = True
|
||||
elif (self.experimental_mode and not condition_met) or not enabled:
|
||||
self.experimental_mode = False
|
||||
self.status_value = 0
|
||||
|
||||
# Update the onroad status bar
|
||||
if self.status_value != self.previous_status_value:
|
||||
self.params_memory.put_int("CEStatus", self.status_value)
|
||||
self.previous_status_value = self.status_value
|
||||
|
||||
self.update_conditions(lead, modelData, radarState, road_curvature, stop_distance, t_follow, v_ego, v_lead)
|
||||
|
||||
# Check conditions for the appropriate state of Experimental Mode
|
||||
def check_conditions(self, carState, frogpilotNavigation, lead, modelData, stop_distance, v_ego):
|
||||
if carState.standstill:
|
||||
self.status_value = 0
|
||||
return self.experimental_mode
|
||||
|
||||
# Keep Experimental Mode active if stopping for a red light
|
||||
if self.status_value == 15 and self.slowing_down(v_ego):
|
||||
return True
|
||||
|
||||
# Navigation check
|
||||
if self.navigation and modelData.navEnabled and (frogpilotNavigation.approachingIntersection or frogpilotNavigation.approachingTurn) and (self.navigation_lead or not self.lead_detected):
|
||||
self.status_value = 7 if frogpilotNavigation.approachingIntersection else 8
|
||||
return True
|
||||
|
||||
# Speed check
|
||||
if (not self.lead_detected and v_ego <= self.limit) or (self.lead_detected and v_ego <= self.limit_lead):
|
||||
self.status_value = 10 if self.lead_detected else 11
|
||||
return True
|
||||
|
||||
# Slower lead check
|
||||
if self.slower_lead and self.slower_lead_detected:
|
||||
self.status_value = 12
|
||||
return True
|
||||
|
||||
# Turn signal check
|
||||
if self.signal and v_ego <= CITY_SPEED_LIMIT and (carState.leftBlinker or carState.rightBlinker):
|
||||
self.status_value = 13
|
||||
return True
|
||||
|
||||
# Road curvature check
|
||||
if self.curves and self.curve_detected:
|
||||
self.status_value = 14
|
||||
return True
|
||||
|
||||
# Stop sign and light check
|
||||
if self.stop_lights and self.red_light_detected:
|
||||
self.status_value = 15
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def update_conditions(self, lead, modelData, radarState, road_curvature, stop_distance, t_follow, v_ego, v_lead):
|
||||
self.lead_detection(lead)
|
||||
self.lead_slowing_down(lead, t_follow, v_lead)
|
||||
self.road_curvature(road_curvature)
|
||||
self.slow_lead(lead, stop_distance, t_follow, v_ego, v_lead)
|
||||
self.stop_sign_and_light(modelData, v_ego)
|
||||
|
||||
# Lead detection
|
||||
def lead_detection(self, lead):
|
||||
lead_status = lead.status
|
||||
|
||||
self.lead_detection_mac.add_data(lead_status)
|
||||
self.lead_detected = self.lead_detection_mac.get_moving_average() >= PROBABILITY
|
||||
|
||||
def lead_slowing_down(self, lead, t_follow, v_lead):
|
||||
if self.lead_detected:
|
||||
lead_close = lead.dRel <= v_lead * t_follow
|
||||
lead_slowing_down = v_lead < self.previous_v_lead
|
||||
lead_stopped = v_lead < 1
|
||||
|
||||
self.previous_v_lead = v_lead
|
||||
|
||||
self.lead_slowing_down_mac.add_data(lead_close or lead_slowing_down or lead_stopped)
|
||||
self.lead_stopping = self.lead_slowing_down_mac.get_moving_average() >= PROBABILITY
|
||||
else:
|
||||
self.lead_slowing_down_mac.reset_data()
|
||||
self.lead_stopping = False
|
||||
self.previous_v_lead = 0
|
||||
|
||||
# Determine if we're slowing down for a potential stop
|
||||
def slowing_down(self, v_ego):
|
||||
slowing_down = v_ego < self.previous_v_ego
|
||||
speed_check = v_ego < CRUISING_SPEED
|
||||
|
||||
self.previous_v_ego = v_ego
|
||||
|
||||
self.slowing_down_mac.add_data(slowing_down and speed_check)
|
||||
return self.slowing_down_mac.get_moving_average() >= PROBABILITY
|
||||
|
||||
# Determine the road curvature - Credit goes to to Pfeiferj!
|
||||
def road_curvature(self, road_curvature):
|
||||
lead_check = self.curves_lead or not self.lead_detected
|
||||
|
||||
if lead_check and not self.red_light_detected:
|
||||
# Setting a limit of 5.0 helps prevent it triggering for red lights
|
||||
curve_detected = 5.0 >= road_curvature > 1.6
|
||||
curve_active = road_curvature > 1.1 and self.curve_detected
|
||||
|
||||
self.curvature_mac.add_data(curve_detected or curve_active)
|
||||
self.curve_detected = self.curvature_mac.get_moving_average() >= PROBABILITY
|
||||
else:
|
||||
self.curvature_mac.reset_data()
|
||||
self.curve_detected = False
|
||||
|
||||
# Slower lead detection - Credit goes to the DragonPilot team!
|
||||
def slow_lead(self, lead, stop_distance, t_follow, v_ego, v_lead):
|
||||
if self.lead_detected:
|
||||
slower_lead_ahead = lead.dRel < (v_ego - 1) * t_follow
|
||||
|
||||
self.slow_lead_mac.add_data(slower_lead_ahead)
|
||||
self.slower_lead_detected = self.slow_lead_mac.get_moving_average() >= PROBABILITY
|
||||
else:
|
||||
self.slow_lead_mac.reset_data()
|
||||
self.slower_lead_detected = False
|
||||
|
||||
# Stop sign/stop light detection - Credit goes to the DragonPilot team!
|
||||
def stop_sign_and_light(self, modelData, v_ego):
|
||||
lead_check = self.stop_lights_lead or not self.lead_stopping
|
||||
|
||||
# Check if the model data is consistent and wants to stop
|
||||
model_check = len(modelData.orientation.x) == len(modelData.position.x) == TRAJECTORY_SIZE
|
||||
model_stopping = modelData.position.x[TRAJECTORY_SIZE - 1] < interp(v_ego * CV.MS_TO_KPH, SLOW_DOWN_BP, SLOW_DOWN_DISTANCE)
|
||||
|
||||
# Filter out any other reasons the model may want to slow down
|
||||
model_filtered = not (self.curve_detected or self.slower_lead_detected)
|
||||
|
||||
self.stop_light_mac.add_data(lead_check and model_check and model_stopping and model_filtered)
|
||||
self.red_light_detected = self.stop_light_mac.get_moving_average() >= PROBABILITY
|
||||
|
||||
def update_frogpilot_params(self, is_metric, params):
|
||||
self.curves = params.get_bool("CECurves")
|
||||
self.curves_lead = self.curves and params.get_bool("CECurvesLead")
|
||||
|
||||
self.limit = params.get_int("CESpeed") * (CV.KPH_TO_MS if is_metric else CV.MPH_TO_MS)
|
||||
self.limit_lead = params.get_int("CESpeedLead") * (CV.KPH_TO_MS if is_metric else CV.MPH_TO_MS)
|
||||
|
||||
self.navigation = params.get_bool("CENavigation")
|
||||
self.navigation_lead = self.navigation and params.get_bool("CENavigationLead")
|
||||
|
||||
self.signal = params.get_bool("CESignal")
|
||||
|
||||
self.slower_lead = params.get_bool("CESlowerLead")
|
||||
|
||||
self.stop_lights = params.get_bool("CEStopLights")
|
||||
self.stop_lights_lead = self.stop_lights and params.get_bool("CEStopLightsLead")
|
||||
@@ -24,6 +24,28 @@ A_CRUISE_MAX_VALS_ECO = [3.5, 3.2, 2.3, 2.0, 1.15, .80, .58, .36, .30, .091]
|
||||
A_CRUISE_MIN_VALS_SPORT = [-0.50, -0.52, -0.55, -0.57, -0.60]
|
||||
A_CRUISE_MAX_VALS_SPORT = [3.5, 3.5, 3.3, 2.8, 1.5, 1.0, .75, .6, .38, .2]
|
||||
|
||||
|
||||
class MovingAverageCalculator:
|
||||
def __init__(self):
|
||||
self.data = []
|
||||
self.total = 0
|
||||
|
||||
def add_data(self, value):
|
||||
if len(self.data) == THRESHOLD:
|
||||
self.total -= self.data.pop(0)
|
||||
self.data.append(value)
|
||||
self.total += value
|
||||
|
||||
def get_moving_average(self):
|
||||
if len(self.data) == 0:
|
||||
return None
|
||||
return self.total / len(self.data)
|
||||
|
||||
def reset_data(self):
|
||||
self.data = []
|
||||
self.total = 0
|
||||
|
||||
|
||||
class FrogPilotFunctions:
|
||||
def __init__(self) -> None:
|
||||
self.params = Params()
|
||||
@@ -57,3 +79,9 @@ class FrogPilotFunctions:
|
||||
distance_to_road_edge = np.mean(np.abs(current_y - road_edge_y_interp))
|
||||
|
||||
return min(distance_to_lane, distance_to_road_edge)
|
||||
|
||||
@staticmethod
|
||||
def road_curvature(modelData, v_ego):
|
||||
predicted_velocities = np.array(modelData.velocity.x)
|
||||
curvature_ratios = np.abs(np.array(modelData.acceleration.y)) / (predicted_velocities**2)
|
||||
return np.amax(curvature_ratios * (v_ego**2))
|
||||
|
||||
@@ -3,10 +3,13 @@ import cereal.messaging as messaging
|
||||
from openpilot.common.conversions import Conversions as CV
|
||||
from openpilot.selfdrive.car.interfaces import ACCEL_MIN, ACCEL_MAX
|
||||
from openpilot.selfdrive.controls.lib.desire_helper import LANE_CHANGE_SPEED_MIN
|
||||
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import STOP_DISTANCE
|
||||
from openpilot.selfdrive.controls.lib.longitudinal_planner import A_CRUISE_MIN, get_max_accel
|
||||
|
||||
from openpilot.selfdrive.frogpilot.functions.frogpilot_functions import CRUISING_SPEED, FrogPilotFunctions
|
||||
|
||||
from openpilot.selfdrive.frogpilot.functions.conditional_experimental_mode import ConditionalExperimentalMode
|
||||
|
||||
class FrogPilotPlanner:
|
||||
def __init__(self, CP, params, params_memory):
|
||||
self.CP = CP
|
||||
@@ -14,6 +17,10 @@ class FrogPilotPlanner:
|
||||
|
||||
self.fpf = FrogPilotFunctions()
|
||||
|
||||
self.cem = ConditionalExperimentalMode(self.params_memory)
|
||||
|
||||
self.road_curvature = 0
|
||||
self.stop_distance = 0
|
||||
self.v_cruise = 0
|
||||
|
||||
self.accel_limits = [A_CRUISE_MIN, get_max_accel(0)]
|
||||
@@ -45,6 +52,10 @@ class FrogPilotPlanner:
|
||||
|
||||
self.accel_limits = [min_accel, max_accel]
|
||||
|
||||
# Update Conditional Experimental Mode
|
||||
if self.conditional_experimental_mode and self.CP.openpilotLongitudinalControl:
|
||||
self.cem.update(carState, enabled, sm['frogpilotNavigation'], modelData, sm['radarState'], self.road_curvature, self.stop_distance, mpc.t_follow, v_ego)
|
||||
|
||||
# Update the current lane widths
|
||||
check_lane_width = self.blind_spot_path
|
||||
if check_lane_width and v_ego >= LANE_CHANGE_SPEED_MIN:
|
||||
@@ -54,6 +65,12 @@ class FrogPilotPlanner:
|
||||
self.lane_width_left = 0
|
||||
self.lane_width_right = 0
|
||||
|
||||
# Update the current road curvature
|
||||
self.road_curvature = self.fpf.road_curvature(modelData, v_ego)
|
||||
|
||||
# Update the desired stopping distance
|
||||
self.stop_distance = STOP_DISTANCE
|
||||
|
||||
# Update the max allowed speed
|
||||
self.v_cruise = self.update_v_cruise(carState, controlsState, enabled, modelData, v_cruise, v_ego)
|
||||
|
||||
@@ -75,6 +92,8 @@ class FrogPilotPlanner:
|
||||
frogpilot_plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
|
||||
frogpilotPlan = frogpilot_plan_send.frogpilotPlan
|
||||
|
||||
frogpilotPlan.conditionalExperimental = self.cem.experimental_mode
|
||||
|
||||
frogpilotPlan.laneWidthLeft = self.lane_width_left
|
||||
frogpilotPlan.laneWidthRight = self.lane_width_right
|
||||
|
||||
@@ -83,6 +102,11 @@ class FrogPilotPlanner:
|
||||
def update_frogpilot_params(self, params):
|
||||
self.is_metric = params.get_bool("IsMetric")
|
||||
|
||||
self.conditional_experimental_mode = params.get_bool("ConditionalExperimental")
|
||||
if self.conditional_experimental_mode:
|
||||
self.cem.update_frogpilot_params(self.is_metric, params)
|
||||
params.put_bool("ExperimentalMode", True)
|
||||
|
||||
custom_ui = params.get_bool("CustomUI")
|
||||
self.blind_spot_path = custom_ui and params.get_bool("BlindSpotPath")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user