import cereal.messaging as messaging from openpilot.common.conversions import Conversions as CV from openpilot.selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX class FrogPilotPlanner: def __init__(self, params): self.v_cruise = 0 self.update_frogpilot_params(params) def update(self, sm, mpc): carState, controlsState, modelData = sm['carState'], sm['controlsState'], sm['modelV2'] enabled = controlsState.enabled v_cruise_kph = min(controlsState.vCruise, V_CRUISE_MAX) v_cruise = v_cruise_kph * CV.KPH_TO_MS v_ego = carState.vEgo self.v_cruise = self.update_v_cruise(carState, controlsState, modelData, enabled, v_cruise, v_ego) def update_v_cruise(self, carState, controlsState, modelData, enabled, v_cruise, v_ego): v_ego_diff = max(carState.vEgoRaw - carState.vEgoCluster, 0) return v_cruise - v_ego_diff def publish_lateral(self, sm, pm, DH): frogpilot_lateral_plan_send = messaging.new_message('frogpilotLateralPlan') frogpilot_lateral_plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState', 'modelV2']) frogpilotLateralPlan = frogpilot_lateral_plan_send.frogpilotLateralPlan pm.send('frogpilotLateralPlan', frogpilot_lateral_plan_send) def publish_longitudinal(self, sm, pm, mpc): frogpilot_longitudinal_plan_send = messaging.new_message('frogpilotLongitudinalPlan') frogpilot_longitudinal_plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState']) frogpilotLongitudinalPlan = frogpilot_longitudinal_plan_send.frogpilotLongitudinalPlan pm.send('frogpilotLongitudinalPlan', frogpilot_longitudinal_plan_send) def update_frogpilot_params(self, params): self.is_metric = params.get_bool("IsMetric") lateral_tune = params.get_bool("LateralTune") longitudinal_tune = params.get_bool("LongitudinalTune")