NNFF
Added toggle to enable NNFF. Credit goes to Twilsonco! https: //github.com/twilsonco Co-Authored-By: Tim Wilson <7284371+twilsonco@users.noreply.github.com>
This commit is contained in:
@@ -3,8 +3,10 @@ import time
|
||||
import numpy as np
|
||||
import tomllib
|
||||
from abc import abstractmethod, ABC
|
||||
from difflib import SequenceMatcher
|
||||
from enum import StrEnum
|
||||
from typing import Any, Dict, Optional, Tuple, List, Callable
|
||||
from json import load
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from cereal import car
|
||||
from openpilot.common.basedir import BASEDIR
|
||||
@@ -32,7 +34,10 @@ FRICTION_THRESHOLD = 0.3
|
||||
TORQUE_PARAMS_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/params.toml')
|
||||
TORQUE_OVERRIDE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/override.toml')
|
||||
TORQUE_SUBSTITUTE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/substitute.toml')
|
||||
TORQUE_NN_MODEL_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/lat_models')
|
||||
|
||||
def similarity(s1:str, s2:str) -> float:
|
||||
return SequenceMatcher(None, s1, s2).ratio()
|
||||
|
||||
def get_torque_params(candidate):
|
||||
with open(TORQUE_SUBSTITUTE_PATH, 'rb') as f:
|
||||
@@ -58,12 +63,114 @@ def get_torque_params(candidate):
|
||||
return {key: out[i] for i, key in enumerate(params['legend'])}
|
||||
|
||||
|
||||
# Twilsonco's Lateral Neural Network Feedforward
|
||||
class FluxModel:
|
||||
# dict used to rename activation functions whose names aren't valid python identifiers
|
||||
activation_function_names = {'σ': 'sigmoid'}
|
||||
def __init__(self, params_file, zero_bias=False):
|
||||
with open(params_file, "r") as f:
|
||||
params = load(f)
|
||||
|
||||
self.input_size = params["input_size"]
|
||||
self.output_size = params["output_size"]
|
||||
self.input_mean = np.array(params["input_mean"], dtype=np.float32).T
|
||||
self.input_std = np.array(params["input_std"], dtype=np.float32).T
|
||||
self.layers = []
|
||||
|
||||
for layer_params in params["layers"]:
|
||||
W = np.array(layer_params[next(key for key in layer_params.keys() if key.endswith('_W'))], dtype=np.float32).T
|
||||
b = np.array(layer_params[next(key for key in layer_params.keys() if key.endswith('_b'))], dtype=np.float32).T
|
||||
if zero_bias:
|
||||
b = np.zeros_like(b)
|
||||
activation = layer_params["activation"]
|
||||
for k, v in self.activation_function_names.items():
|
||||
activation = activation.replace(k, v)
|
||||
self.layers.append((W, b, activation))
|
||||
|
||||
self.validate_layers()
|
||||
self.check_for_friction_override()
|
||||
|
||||
# Begin activation functions.
|
||||
# These are called by name using the keys in the model json file
|
||||
def sigmoid(self, x):
|
||||
return 1 / (1 + np.exp(-x))
|
||||
|
||||
def identity(self, x):
|
||||
return x
|
||||
# End activation functions
|
||||
|
||||
def forward(self, x):
|
||||
for W, b, activation in self.layers:
|
||||
x = getattr(self, activation)(x.dot(W) + b)
|
||||
return x
|
||||
|
||||
def evaluate(self, input_array):
|
||||
in_len = len(input_array)
|
||||
if in_len != self.input_size:
|
||||
# If the input is length 2-4, then it's a simplified evaluation.
|
||||
# In that case, need to add on zeros to fill out the input array to match the correct length.
|
||||
if 2 <= in_len:
|
||||
input_array = input_array + [0] * (self.input_size - in_len)
|
||||
else:
|
||||
raise ValueError(f"Input array length {len(input_array)} must be length 2 or greater")
|
||||
|
||||
input_array = np.array(input_array, dtype=np.float32)
|
||||
|
||||
# Rescale the input array using the input_mean and input_std
|
||||
input_array = (input_array - self.input_mean) / self.input_std
|
||||
|
||||
output_array = self.forward(input_array)
|
||||
|
||||
return float(output_array[0, 0])
|
||||
|
||||
def validate_layers(self):
|
||||
for W, b, activation in self.layers:
|
||||
if not hasattr(self, activation):
|
||||
raise ValueError(f"Unknown activation: {activation}")
|
||||
|
||||
def check_for_friction_override(self):
|
||||
y = self.evaluate([10.0, 0.0, 0.2])
|
||||
self.friction_override = (y < 0.1)
|
||||
|
||||
def get_nn_model_path(car, eps_firmware) -> Tuple[Union[str, None, float]]:
|
||||
def check_nn_path(check_model):
|
||||
model_path = None
|
||||
max_similarity = -1.0
|
||||
for f in os.listdir(TORQUE_NN_MODEL_PATH):
|
||||
if f.endswith(".json"):
|
||||
model = f.replace(".json", "").replace(f"{TORQUE_NN_MODEL_PATH}/","")
|
||||
similarity_score = similarity(model, check_model)
|
||||
if similarity_score > max_similarity:
|
||||
max_similarity = similarity_score
|
||||
model_path = os.path.join(TORQUE_NN_MODEL_PATH, f)
|
||||
return model_path, max_similarity
|
||||
|
||||
if len(eps_firmware) > 3:
|
||||
eps_firmware = eps_firmware.replace("\\", "")
|
||||
check_model = f"{car} {eps_firmware}"
|
||||
else:
|
||||
check_model = car
|
||||
model_path, max_similarity = check_nn_path(check_model)
|
||||
if car not in model_path or 0.0 <= max_similarity < 0.9:
|
||||
check_model = car
|
||||
model_path, max_similarity = check_nn_path(check_model)
|
||||
if car not in model_path or 0.0 <= max_similarity < 0.9:
|
||||
model_path = None
|
||||
return model_path, max_similarity
|
||||
|
||||
def get_nn_model(car, eps_firmware) -> Tuple[Union[FluxModel, None, float]]:
|
||||
model, similarity_score = get_nn_model_path(car, eps_firmware)
|
||||
if model is not None:
|
||||
model = FluxModel(model)
|
||||
return model, similarity_score
|
||||
|
||||
# generic car and radar interfaces
|
||||
|
||||
class CarInterfaceBase(ABC):
|
||||
def __init__(self, CP, CarController, CarState):
|
||||
self.CP = CP
|
||||
self.VM = VehicleModel(CP)
|
||||
eps_firmware = str(next((fw.fwVersion for fw in CP.carFw if fw.ecu == "eps"), ""))
|
||||
|
||||
self.frame = 0
|
||||
self.steering_unpressed = 0
|
||||
@@ -91,12 +198,21 @@ class CarInterfaceBase(ABC):
|
||||
# FrogPilot variables
|
||||
params = Params()
|
||||
|
||||
self.has_lateral_torque_nn = self.initialize_lat_torque_nn(CP.carFingerprint, eps_firmware) and params.get_bool("LateralTune") and params.get_bool("NNFF")
|
||||
|
||||
self.belowSteerSpeed_shown = False
|
||||
self.disable_belowSteerSpeed = False
|
||||
|
||||
self.resumeRequired_shown = False
|
||||
self.disable_resumeRequired = False
|
||||
|
||||
def get_ff_nn(self, x):
|
||||
return self.lat_torque_nn_model.evaluate(x)
|
||||
|
||||
def initialize_lat_torque_nn(self, car, eps_firmware):
|
||||
self.lat_torque_nn_model, _ = get_nn_model(car, eps_firmware)
|
||||
return (self.lat_torque_nn_model is not None)
|
||||
|
||||
@staticmethod
|
||||
def get_pid_accel_limits(CP, current_speed, cruise_speed, sport_plus):
|
||||
if sport_plus:
|
||||
@@ -116,6 +232,15 @@ class CarInterfaceBase(ABC):
|
||||
ret = CarInterfaceBase.get_std_params(candidate)
|
||||
ret = cls._get_params(ret, candidate, fingerprint, car_fw, experimental_long, docs)
|
||||
|
||||
# Enable torque controller for all cars that do not use angle based steering
|
||||
if ret.steerControlType != car.CarParams.SteerControlType.angle and Params().get_bool("LateralTune") and Params().get_bool("NNFF"):
|
||||
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning)
|
||||
eps_firmware = str(next((fw.fwVersion for fw in car_fw if fw.ecu == "eps"), ""))
|
||||
model, similarity_score = get_nn_model_path(candidate, eps_firmware)
|
||||
if model is not None:
|
||||
Params("/dev/shm/params").put_bool("NNFFModelFuzzyMatch", similarity_score < 0.99)
|
||||
Params("/dev/shm/params").put("NNFFModelName", candidate)
|
||||
|
||||
# Vehicle mass is published curb weight plus assumed payload such as a human driver; notCars have no assumed payload
|
||||
if not ret.notCar:
|
||||
ret.mass = ret.mass + STD_CARGO_KG
|
||||
|
||||
Reference in New Issue
Block a user