Added toggle to enable NNFF.

Credit goes to Twilsonco!

https: //github.com/twilsonco
Co-Authored-By: Tim Wilson <7284371+twilsonco@users.noreply.github.com>
This commit is contained in:
FrogAi
2024-02-27 16:34:47 -07:00
parent bc18137016
commit 6266c08db4
453 changed files with 778 additions and 18 deletions

View File

@@ -3,8 +3,10 @@ import os
import numpy as np
import tomllib
from abc import abstractmethod, ABC
from difflib import SequenceMatcher
from enum import StrEnum
from typing import Any, Dict, Optional, Tuple, List, Callable, NamedTuple
from json import load
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Union
from cereal import car
from openpilot.common.basedir import BASEDIR
@@ -20,6 +22,8 @@ from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.selfdrive.frogpilot.functions.frogpilot_functions import FrogPilotFunctions
params_memory = Params("/dev/shm/params")
ButtonType = car.CarState.ButtonEvent.Type
GearShifter = car.CarState.GearShifter
EventName = car.CarEvent.EventName
@@ -33,7 +37,10 @@ FRICTION_THRESHOLD = 0.3
TORQUE_PARAMS_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/params.toml')
TORQUE_OVERRIDE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/override.toml')
TORQUE_SUBSTITUTE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/substitute.toml')
TORQUE_NN_MODEL_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/lat_models')
def similarity(s1:str, s2:str) -> float:
return SequenceMatcher(None, s1, s2).ratio()
class LatControlInputs(NamedTuple):
lateral_acceleration: float
@@ -69,12 +76,114 @@ def get_torque_params(candidate):
return {key: out[i] for i, key in enumerate(params['legend'])}
# Twilsonco's Lateral Neural Network Feedforward
class FluxModel:
# dict used to rename activation functions whose names aren't valid python identifiers
activation_function_names = {'σ': 'sigmoid'}
def __init__(self, params_file, zero_bias=False):
with open(params_file, "r") as f:
params = load(f)
self.input_size = params["input_size"]
self.output_size = params["output_size"]
self.input_mean = np.array(params["input_mean"], dtype=np.float32).T
self.input_std = np.array(params["input_std"], dtype=np.float32).T
self.layers = []
for layer_params in params["layers"]:
W = np.array(layer_params[next(key for key in layer_params.keys() if key.endswith('_W'))], dtype=np.float32).T
b = np.array(layer_params[next(key for key in layer_params.keys() if key.endswith('_b'))], dtype=np.float32).T
if zero_bias:
b = np.zeros_like(b)
activation = layer_params["activation"]
for k, v in self.activation_function_names.items():
activation = activation.replace(k, v)
self.layers.append((W, b, activation))
self.validate_layers()
self.check_for_friction_override()
# Begin activation functions.
# These are called by name using the keys in the model json file
def sigmoid(self, x):
return 1 / (1 + np.exp(-x))
def identity(self, x):
return x
# End activation functions
def forward(self, x):
for W, b, activation in self.layers:
x = getattr(self, activation)(x.dot(W) + b)
return x
def evaluate(self, input_array):
in_len = len(input_array)
if in_len != self.input_size:
# If the input is length 2-4, then it's a simplified evaluation.
# In that case, need to add on zeros to fill out the input array to match the correct length.
if 2 <= in_len:
input_array = input_array + [0] * (self.input_size - in_len)
else:
raise ValueError(f"Input array length {len(input_array)} must be length 2 or greater")
input_array = np.array(input_array, dtype=np.float32)
# Rescale the input array using the input_mean and input_std
input_array = (input_array - self.input_mean) / self.input_std
output_array = self.forward(input_array)
return float(output_array[0, 0])
def validate_layers(self):
for W, b, activation in self.layers:
if not hasattr(self, activation):
raise ValueError(f"Unknown activation: {activation}")
def check_for_friction_override(self):
y = self.evaluate([10.0, 0.0, 0.2])
self.friction_override = (y < 0.1)
def get_nn_model_path(car, eps_firmware) -> Tuple[Union[str, None, float]]:
def check_nn_path(check_model):
model_path = None
max_similarity = -1.0
for f in os.listdir(TORQUE_NN_MODEL_PATH):
if f.endswith(".json") and car in f:
model = f.replace(".json", "").replace(f"{TORQUE_NN_MODEL_PATH}/", "")
similarity_score = similarity(model, check_model)
if similarity_score > max_similarity:
max_similarity = similarity_score
model_path = os.path.join(TORQUE_NN_MODEL_PATH, f)
return model_path, max_similarity
if len(eps_firmware) > 3:
eps_firmware = eps_firmware.replace("\\", "")
check_model = f"{car} {eps_firmware}"
else:
check_model = car
model_path, max_similarity = check_nn_path(check_model)
if max_similarity < 0.9:
check_model = car
model_path, max_similarity = check_nn_path(check_model)
if max_similarity < 0.9:
model_path = None
return model_path, max_similarity
def get_nn_model(car, eps_firmware) -> Tuple[Union[FluxModel, None, float]]:
model, similarity_score = get_nn_model_path(car, eps_firmware)
if model is not None:
model = FluxModel(model)
return model, similarity_score
# generic car and radar interfaces
class CarInterfaceBase(ABC):
def __init__(self, CP, CarController, CarState):
self.CP = CP
self.VM = VehicleModel(CP)
eps_firmware = str(next((fw.fwVersion for fw in CP.carFw if fw.ecu == "eps"), ""))
self.frame = 0
self.steering_unpressed = 0
@@ -102,12 +211,21 @@ class CarInterfaceBase(ABC):
# FrogPilot variables
params = Params()
self.has_lateral_torque_nn = self.initialize_lat_torque_nn(CP.carFingerprint, eps_firmware) and params.get_bool("NNFF") and params.get_bool("LateralTune")
self.belowSteerSpeed_shown = False
self.disable_belowSteerSpeed = False
self.resumeRequired_shown = False
self.disable_resumeRequired = False
def get_ff_nn(self, x):
return self.lat_torque_nn_model.evaluate(x)
def initialize_lat_torque_nn(self, car, eps_firmware):
self.lat_torque_nn_model, _ = get_nn_model(car, eps_firmware)
return (self.lat_torque_nn_model is not None)
@staticmethod
def get_pid_accel_limits(CP, current_speed, cruise_speed, frogpilot_variables):
if frogpilot_variables.sport_plus:
@@ -127,6 +245,15 @@ class CarInterfaceBase(ABC):
ret = CarInterfaceBase.get_std_params(candidate)
ret = cls._get_params(ret, params, candidate, fingerprint, car_fw, experimental_long, docs)
# Enable torque controller for all cars that do not use angle based steering
if ret.steerControlType != car.CarParams.SteerControlType.angle and params.get_bool("LateralTune") and params.get_bool("NNFF"):
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning)
eps_firmware = str(next((fw.fwVersion for fw in car_fw if fw.ecu == "eps"), ""))
model, similarity_score = get_nn_model_path(candidate, eps_firmware)
if model is not None:
params_memory.put_bool("NNFFModelFuzzyMatch", similarity_score < 0.99)
params_memory.put("NNFFModelName", candidate)
# Vehicle mass is published curb weight plus assumed payload such as a human driver; notCars have no assumed payload
if not ret.notCar:
ret.mass = ret.mass + STD_CARGO_KG