Conditional Experimental Mode

Added toggles for "Conditional Experimental Mode".

Conditions based on road curvature, turn signals, speed, lead speed, navigation instructions, and stop signs/stop lights are all individually toggleable.
This commit is contained in:
FrogAi
2024-01-12 22:39:30 -07:00
parent c844593f66
commit ccf674762e
17 changed files with 378 additions and 12 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

View File

@@ -0,0 +1,197 @@
import numpy as np
from cereal import car
from openpilot.common.conversions import Conversions as CV
from openpilot.common.numpy_fast import interp
from openpilot.common.params import Params
# Constants
PROBABILITY = 0.6 # 60% chance of condition being true
THRESHOLD = 5 # Time threshold (0.25s)
SPEED_LIMIT = 25 # Speed limit for turn signal check
# Lookup table for stop sign / stop light detection
SLOW_DOWN_BP = [0., 10., 20., 30., 40., 50., 55.]
SLOW_DOWN_DISTANCE = [10, 30., 50., 70., 80., 90., 120.]
TRAJECTORY_SIZE = 33
class GenericMovingAverageCalculator:
def __init__(self):
self.data = []
self.total = 0
def add_data(self, value):
if len(self.data) == THRESHOLD:
self.total -= self.data.pop(0)
self.data.append(value)
self.total += value
def get_moving_average(self):
if len(self.data) == 0:
return None
return self.total / len(self.data)
def reset_data(self):
self.data = []
self.total = 0
class ConditionalExperimentalMode:
def __init__(self):
self.params_memory = Params("/dev/shm/params")
self.curve_detected = False
self.experimental_mode = False
self.lead_detected = False
self.lead_detected_previously = False
self.lead_slowing_down = False
self.red_light_detected = False
self.slower_lead_detected = False
self.slowing_down = False
self.previous_status_value = 0
self.previous_v_ego = 0
self.previous_v_lead = 0
self.status_value = 0
self.curvature_gmac = GenericMovingAverageCalculator()
self.lead_detection_gmac = GenericMovingAverageCalculator()
self.lead_slowing_down_gmac = GenericMovingAverageCalculator()
self.slow_down_gmac = GenericMovingAverageCalculator()
self.slow_lead_gmac = GenericMovingAverageCalculator()
self.slowing_down_gmac = GenericMovingAverageCalculator()
def update(self, carState, frogpilotNavigation, modelData, mpc, radarState, standstill, v_ego):
# Update Experimental Mode based on the current driving conditions
condition_met = self.check_conditions(carState, frogpilotNavigation, modelData, standstill, v_ego)
if (not self.experimental_mode and condition_met):
self.experimental_mode = True
elif (self.experimental_mode and not condition_met):
self.experimental_mode = False
self.status_value = 0
# Update the onroad status bar
if self.status_value != self.previous_status_value:
self.previous_status_value = self.status_value
self.params_memory.put_int("CEStatus", self.status_value)
self.update_conditions(modelData, mpc, radarState, v_ego)
# Check conditions for the appropriate state of Experimental Mode
def check_conditions(self, carState, frogpilotNavigation, modelData, standstill, v_ego):
if standstill:
return self.experimental_mode
# Keep Experimental Mode active if slowing down for a red light
if self.slowing_down and self.status_value == 12 and not self.lead_slowing_down:
return True
# Navigation check
if self.navigation and modelData.navEnabled and frogpilotNavigation.navigationConditionMet:
self.status_value = 5
return True
# Speed check
if (not self.lead_detected and v_ego < self.limit) or (self.lead_detected and v_ego < self.limit_lead):
self.status_value = 7 if self.lead_detected else 8
return True
# Slower lead check
if self.slower_lead and self.slower_lead_detected:
self.status_value = 9
return True
# Turn signal check
if self.signal and v_ego < SPEED_LIMIT and (carState.leftBlinker or carState.rightBlinker):
self.status_value = 10
return True
# Road curvature check
if self.curves and self.curve_detected and (self.curves_lead or not self.lead_detected):
self.status_value = 11
return True
# Stop sign and light check
if self.stop_lights and self.red_light_detected and (self.stop_lights_lead or not self.lead_slowing_down):
self.status_value = 12
return True
return False
def update_conditions(self, modelData, mpc, radarState, v_ego):
self.lead_detection(radarState)
self.road_curvature(modelData, v_ego)
self.slow_lead(mpc, radarState, v_ego)
self.stop_sign_and_light(modelData, v_ego)
self.v_ego_slowing_down(v_ego)
self.v_lead_slowing_down(mpc, radarState)
def v_ego_slowing_down(self, v_ego):
self.slowing_down_gmac.add_data(v_ego < self.previous_v_ego)
self.slowing_down = self.slowing_down_gmac.get_moving_average() >= PROBABILITY
self.previous_v_ego = v_ego
def v_lead_slowing_down(self, mpc, radarState):
if self.lead_detected:
v_lead = radarState.leadOne.vLead
self.lead_slowing_down_gmac.add_data(v_lead < self.previous_v_lead or v_lead < 1 or radarState.leadOne.dRel <= v_lead * mpc.t_follow)
self.lead_slowing_down = self.lead_slowing_down_gmac.get_moving_average() >= PROBABILITY
self.previous_v_lead = v_lead
else:
self.lead_slowing_down_gmac.reset_data()
self.lead_slowing_down = False
self.previous_v_lead = 0
# Lead detection
def lead_detection(self, radarState):
self.lead_detection_gmac.add_data(radarState.leadOne.status)
self.lead_detected = self.lead_detection_gmac.get_moving_average() >= PROBABILITY
# Determine the road curvature - Credit goes to to Pfeiferj!
def road_curvature(self, modelData, v_ego):
predicted_velocities = np.array(modelData.velocity.x)
if predicted_velocities.size:
curvature_ratios = np.abs(np.array(modelData.acceleration.y)) / (predicted_velocities**2)
curvature = np.amax(curvature_ratios * (v_ego**2))
# Setting an upper limit of "5.0" helps prevent it activating at stop lights
if curvature > 5.0:
self.curvature_gmac.reset_data()
self.curve_detected = False
else:
self.curvature_gmac.add_data(curvature > 1.6 or self.curve_detected and curvature > 1.1)
self.curve_detected = self.curvature_gmac.get_moving_average() >= PROBABILITY
else:
self.curvature_gmac.reset_data()
# Slower lead detection - Credit goes to the DragonPilot team!
def slow_lead(self, mpc, radarState, v_ego):
if not self.lead_detected and self.lead_detected_previously:
self.slow_lead_gmac.reset_data()
self.slower_lead_detected = False
if self.lead_detected and v_ego >= 0.01:
self.slow_lead_gmac.add_data(radarState.leadOne.dRel < (v_ego - 1) * mpc.t_follow)
self.slower_lead_detected = self.slow_lead_gmac.get_moving_average() >= PROBABILITY
self.lead_detected_previously = self.lead_detected
# Stop sign/stop light detection - Credit goes to the DragonPilot team!
def stop_sign_and_light(self, modelData, v_ego):
# Check if the model data is consistent and wants to stop
model_check = len(modelData.orientation.x) == len(modelData.position.x) == TRAJECTORY_SIZE
model_stopping = modelData.position.x[TRAJECTORY_SIZE - 1] < interp(v_ego * CV.MS_TO_KPH, SLOW_DOWN_BP, SLOW_DOWN_DISTANCE)
self.slow_down_gmac.add_data(model_check and model_stopping and not self.curve_detected and not self.slower_lead_detected)
self.red_light_detected = self.slow_down_gmac.get_moving_average() >= PROBABILITY
def update_frogpilot_params(self, is_metric, params):
self.curves = params.get_bool("CECurves")
self.curves_lead = params.get_bool("CECurvesLead")
self.limit = params.get_int("CESpeed") * (CV.KPH_TO_MS if is_metric else CV.MPH_TO_MS)
self.limit_lead = params.get_int("CESpeedLead") * (CV.KPH_TO_MS if is_metric else CV.MPH_TO_MS)
self.navigation = params.get_bool("CENavigation")
self.signal = params.get_bool("CESignal")
self.slower_lead = params.get_bool("CESlowerLead")
self.stop_lights = params.get_bool("CEStopLights")
self.stop_lights_lead = params.get_bool("CEStopLightsLead")

View File

@@ -8,6 +8,8 @@ from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDX
from openpilot.selfdrive.controls.lib.longitudinal_planner import A_CRUISE_MIN, A_CRUISE_MAX_VALS, A_CRUISE_MAX_BP, get_max_accel
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.frogpilot.functions.conditional_experimental_mode import ConditionalExperimentalMode
# Acceleration profiles - Credit goes to the DragonPilot team!
# MPH = [0., 35, 35, 40, 40, 45, 45, 67, 67, 67, 123]
A_CRUISE_MIN_BP_CUSTOM = [0., 2.0, 2.01, 11., 11.01, 18., 18.01, 28., 28.01, 33., 55.]
@@ -35,6 +37,8 @@ def get_max_accel_sport_tune(v_ego):
class FrogPilotPlanner:
def __init__(self, params):
self.cem = ConditionalExperimentalMode()
self.v_cruise = 0
self.x_desired_trajectory = np.zeros(CONTROL_N)
@@ -58,6 +62,10 @@ class FrogPilotPlanner:
else:
self.accel_limits = [A_CRUISE_MIN, get_max_accel(v_ego)]
# Conditional Experimental Mode
if self.conditional_experimental_mode and enabled:
self.cem.update(carState, sm['frogpilotNavigation'], sm['modelV2'], mpc, sm['radarState'], carState.standstill, v_ego)
self.v_cruise = self.update_v_cruise(carState, controlsState, modelData, enabled, v_cruise, v_ego)
self.x_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, mpc.x_solution)
@@ -79,6 +87,7 @@ class FrogPilotPlanner:
frogpilot_longitudinal_plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
frogpilotLongitudinalPlan = frogpilot_longitudinal_plan_send.frogpilotLongitudinalPlan
frogpilotLongitudinalPlan.conditionalExperimental = self.cem.experimental_mode
frogpilotLongitudinalPlan.distances = self.x_desired_trajectory.tolist()
pm.send('frogpilotLongitudinalPlan', frogpilot_longitudinal_plan_send)
@@ -86,6 +95,12 @@ class FrogPilotPlanner:
def update_frogpilot_params(self, params):
self.is_metric = params.get_bool("IsMetric")
self.conditional_experimental_mode = params.get_bool("ConditionalExperimental")
if self.conditional_experimental_mode:
self.cem.update_frogpilot_params(self.is_metric, params)
if not params.get_bool("ExperimentalMode"):
params.put_bool("ExperimentalMode", True)
lateral_tune = params.get_bool("LateralTune")
self.average_desired_curvature = params.get_bool("AverageCurvature") and lateral_tune

View File

@@ -5,6 +5,13 @@ FrogPilotControlsPanel::FrogPilotControlsPanel(SettingsWindow *parent) : FrogPil
const std::vector<std::tuple<QString, QString, QString, QString>> controlToggles {
{"AlwaysOnLateral", "Always on Lateral", "Maintain openpilot lateral control when the brake or gas pedals are used.\n\nDeactivation occurs only through the 'Cruise Control' button.", "../frogpilot/assets/toggle_icons/icon_always_on_lateral.png"},
{"ConditionalExperimental", "Conditional Experimental Mode", "Automatically switches to 'Experimental Mode' under predefined conditions.", "../frogpilot/assets/toggle_icons/icon_conditional.png"},
{"CECurves", "Curve Detected Ahead", "Switch to 'Experimental Mode' when a curve is detected.", ""},
{"CENavigation", "Navigation Based", "Switch to 'Experimental Mode' based on navigation data. (i.e. Intersections, stop signs, etc.)", ""},
{"CESlowerLead", "Slower Lead Detected Ahead", "Switch to 'Experimental Mode' when a slower lead vehicle is detected ahead.", ""},
{"CEStopLights", "Stop Lights and Stop Signs", "Switch to 'Experimental Mode' when a stop light or stop sign is detected.", ""},
{"CESignal", "Turn Signal When Below Highway Speeds", "Switch to 'Experimental Mode' when using turn signals below highway speeds to help assit with turns.", ""},
{"LateralTune", "Lateral Tuning", "Modify openpilot's steering behavior.", "../frogpilot/assets/toggle_icons/icon_lateral_tune.png"},
{"AverageCurvature", "Average Desired Curvature", "Use Pfeiferj's distance-based curvature adjustment for improved curve handling.", ""},
@@ -31,6 +38,40 @@ FrogPilotControlsPanel::FrogPilotControlsPanel(SettingsWindow *parent) : FrogPil
}
});
} else if (param == "ConditionalExperimental") {
FrogPilotParamManageControl *conditionalExperimentalToggle = new FrogPilotParamManageControl(param, title, desc, icon, this);
QObject::connect(conditionalExperimentalToggle, &FrogPilotParamManageControl::manageButtonClicked, this, [this]() {
parentToggleClicked();
conditionalSpeedsImperial->setVisible(!isMetric);
conditionalSpeedsMetric->setVisible(isMetric);
for (auto &[key, toggle] : toggles) {
toggle->setVisible(conditionalExperimentalKeys.find(key.c_str()) != conditionalExperimentalKeys.end());
}
});
toggle = conditionalExperimentalToggle;
} else if (param == "CECurves") {
FrogPilotParamValueControl *CESpeedImperial = new FrogPilotParamValueControl("CESpeed", "Below", "Switch to 'Experimental Mode' below this speed in absence of a lead vehicle.", "", 0, 99,
std::map<int, QString>(), this, false, " mph");
FrogPilotParamValueControl *CESpeedLeadImperial = new FrogPilotParamValueControl("CESpeedLead", " w/Lead", "Switch to 'Experimental Mode' below this speed when following a lead vehicle.", "", 0, 99,
std::map<int, QString>(), this, false, " mph");
conditionalSpeedsImperial = new FrogPilotDualParamControl(CESpeedImperial, CESpeedLeadImperial, this);
addItem(conditionalSpeedsImperial);
FrogPilotParamValueControl *CESpeedMetric = new FrogPilotParamValueControl("CESpeed", "Below", "Switch to 'Experimental Mode' below this speed in absence of a lead vehicle.", "", 0, 150,
std::map<int, QString>(), this, false, " kph");
FrogPilotParamValueControl *CESpeedLeadMetric = new FrogPilotParamValueControl("CESpeedLead", " w/Lead", "Switch to 'Experimental Mode' below this speed when following a lead vehicle.", "",
0, 150, std::map<int, QString>(), this, false, " kph");
conditionalSpeedsMetric = new FrogPilotDualParamControl(CESpeedMetric, CESpeedLeadMetric, this);
addItem(conditionalSpeedsMetric);
std::vector<QString> curveToggles{tr("CECurvesLead")};
std::vector<QString> curveToggleNames{tr("With Lead")};
toggle = new FrogPilotParamToggleControl(param, title, desc, icon, curveToggles, curveToggleNames);
} else if (param == "CEStopLights") {
std::vector<QString> stopLightToggles{tr("CEStopLightsLead")};
std::vector<QString> stopLightToggleNames{tr("With Lead")};
toggle = new FrogPilotParamToggleControl(param, title, desc, icon, stopLightToggles, stopLightToggleNames);
} else if (param == "LateralTune") {
FrogPilotParamManageControl *lateralTuneToggle = new FrogPilotParamManageControl(param, title, desc, icon, this);
QObject::connect(lateralTuneToggle, &FrogPilotParamManageControl::manageButtonClicked, this, [this]() {
@@ -95,7 +136,7 @@ FrogPilotControlsPanel::FrogPilotControlsPanel(SettingsWindow *parent) : FrogPil
});
}
conditionalExperimentalKeys = {};
conditionalExperimentalKeys = {"CECurves", "CECurvesLead", "CESlowerLead", "CENavigation", "CEStopLights", "CESignal"};
fireTheBabysitterKeys = {};
laneChangeKeys = {};
lateralTuneKeys = {"AverageCurvature"};
@@ -126,6 +167,8 @@ void FrogPilotControlsPanel::updateMetric() {
if (isMetric != previousIsMetric) {
double distanceConversion = isMetric ? FOOT_TO_METER : METER_TO_FOOT;
double speedConversion = isMetric ? MILE_TO_KM : KM_TO_MILE;
params.putInt("CESpeed", std::nearbyint(params.getInt("CESpeed") * speedConversion));
params.putInt("CESpeedLead", std::nearbyint(params.getInt("CESpeedLead") * speedConversion));
}
if (isMetric) {
@@ -136,12 +179,18 @@ void FrogPilotControlsPanel::updateMetric() {
}
void FrogPilotControlsPanel::parentToggleClicked() {
conditionalSpeedsImperial->setVisible(false);
conditionalSpeedsMetric->setVisible(false);
this->openParentToggle();
}
void FrogPilotControlsPanel::hideSubToggles() {
conditionalSpeedsImperial->setVisible(false);
conditionalSpeedsMetric->setVisible(false);
for (auto &[key, toggle] : toggles) {
const bool subToggles = conditionalExperimentalKeys.find(key.c_str()) != conditionalExperimentalKeys.end() ||
bool subToggles = conditionalExperimentalKeys.find(key.c_str()) != conditionalExperimentalKeys.end() ||
fireTheBabysitterKeys.find(key.c_str()) != fireTheBabysitterKeys.end() ||
laneChangeKeys.find(key.c_str()) != laneChangeKeys.end() ||
lateralTuneKeys.find(key.c_str()) != lateralTuneKeys.end() ||

View File

@@ -22,6 +22,9 @@ private:
void updateMetric();
void updateToggles();
FrogPilotDualParamControl *conditionalSpeedsImperial;
FrogPilotDualParamControl *conditionalSpeedsMetric;
std::set<QString> conditionalExperimentalKeys;
std::set<QString> fireTheBabysitterKeys;
std::set<QString> laneChangeKeys;