wip
This commit is contained in:
@@ -1,18 +1,20 @@
|
||||
#!/usr/bin/env python3
|
||||
from collections import defaultdict
|
||||
from typing import Any, DefaultDict, Dict, Iterator, List, Optional, Set, TypeVar
|
||||
from collections.abc import Iterator
|
||||
from typing import Any, Protocol, TypeVar
|
||||
|
||||
from tqdm import tqdm
|
||||
import capnp
|
||||
|
||||
import panda.python.uds as uds
|
||||
from cereal import car
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs
|
||||
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig
|
||||
from openpilot.selfdrive.car.interfaces import get_interface_attr
|
||||
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
|
||||
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
|
||||
from openpilot.common.swaglog import cloudlog
|
||||
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs
|
||||
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
|
||||
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions
|
||||
from openpilot.selfdrive.car.interfaces import get_interface_attr
|
||||
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
|
||||
|
||||
Ecu = car.CarParams.Ecu
|
||||
ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa]
|
||||
@@ -27,19 +29,18 @@ REQUESTS = [(brand, config, r) for brand, config in FW_QUERY_CONFIGS.items() for
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
def chunks(l: List[T], n: int = 128) -> Iterator[List[T]]:
|
||||
def chunks(l: list[T], n: int = 128) -> Iterator[list[T]]:
|
||||
for i in range(0, len(l), n):
|
||||
yield l[i:i + n]
|
||||
|
||||
|
||||
def is_brand(brand: str, filter_brand: Optional[str]) -> bool:
|
||||
def is_brand(brand: str, filter_brand: str | None) -> bool:
|
||||
"""Returns if brand matches filter_brand or no brand filter is specified"""
|
||||
return filter_brand is None or brand == filter_brand
|
||||
|
||||
|
||||
def build_fw_dict(fw_versions: List[capnp.lib.capnp._DynamicStructBuilder],
|
||||
filter_brand: Optional[str] = None) -> Dict[AddrType, Set[bytes]]:
|
||||
fw_versions_dict: DefaultDict[AddrType, Set[bytes]] = defaultdict(set)
|
||||
def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], filter_brand: str = None) -> dict[AddrType, set[bytes]]:
|
||||
fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set)
|
||||
for fw in fw_versions:
|
||||
if is_brand(fw.brand, filter_brand) and not fw.logging:
|
||||
sub_addr = fw.subAddress if fw.subAddress != 0 else None
|
||||
@@ -47,7 +48,12 @@ def build_fw_dict(fw_versions: List[capnp.lib.capnp._DynamicStructBuilder],
|
||||
return dict(fw_versions_dict)
|
||||
|
||||
|
||||
def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=None):
|
||||
class MatchFwToCar(Protocol):
|
||||
def __call__(self, live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True) -> set[str]:
|
||||
...
|
||||
|
||||
|
||||
def match_fw_to_car_fuzzy(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, exclude: str = None) -> set[str]:
|
||||
"""Do a fuzzy FW match. This function will return a match, and the number of firmware version
|
||||
that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars
|
||||
the match is rejected."""
|
||||
@@ -72,7 +78,7 @@ def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=
|
||||
all_fw_versions[(addr[1], addr[2], f)].append(candidate)
|
||||
|
||||
matched_ecus = set()
|
||||
candidate = None
|
||||
match: str | None = None
|
||||
for addr, versions in live_fw_versions.items():
|
||||
ecu_key = (addr[0], addr[1])
|
||||
for version in versions:
|
||||
@@ -81,23 +87,23 @@ def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=
|
||||
|
||||
if len(candidates) == 1:
|
||||
matched_ecus.add(ecu_key)
|
||||
if candidate is None:
|
||||
candidate = candidates[0]
|
||||
if match is None:
|
||||
match = candidates[0]
|
||||
# We uniquely matched two different cars. No fuzzy match possible
|
||||
elif candidate != candidates[0]:
|
||||
elif match != candidates[0]:
|
||||
return set()
|
||||
|
||||
# Note that it is possible to match to a candidate without all its ECUs being present
|
||||
# if there are enough matches. FIXME: parameterize this or require all ECUs to exist like exact matching
|
||||
if len(matched_ecus) >= 2:
|
||||
if match and len(matched_ecus) >= 2:
|
||||
if log:
|
||||
cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {len(matched_ecus)} matching ECUs")
|
||||
return {candidate}
|
||||
cloudlog.error(f"Fingerprinted {match} using fuzzy match. {len(matched_ecus)} matching ECUs")
|
||||
return {match}
|
||||
else:
|
||||
return set()
|
||||
|
||||
|
||||
def match_fw_to_car_exact(live_fw_versions, match_brand=None, log=True, extra_fw_versions=None) -> Set[str]:
|
||||
def match_fw_to_car_exact(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, extra_fw_versions: dict = None) -> set[str]:
|
||||
"""Do an exact FW match. Returns all cars that match the given
|
||||
FW versions for a list of "essential" ECUs. If an ECU is not considered
|
||||
essential the FW version can be missing to get a fingerprint, but if it's present it
|
||||
@@ -138,9 +144,10 @@ def match_fw_to_car_exact(live_fw_versions, match_brand=None, log=True, extra_fw
|
||||
return set(candidates.keys()) - invalid
|
||||
|
||||
|
||||
def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True):
|
||||
def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], allow_exact: bool = True, allow_fuzzy: bool = True,
|
||||
log: bool = True) -> tuple[bool, set[str]]:
|
||||
# Try exact matching first
|
||||
exact_matches = []
|
||||
exact_matches: list[tuple[bool, MatchFwToCar]] = []
|
||||
if allow_exact:
|
||||
exact_matches = [(True, match_fw_to_car_exact)]
|
||||
if allow_fuzzy:
|
||||
@@ -148,7 +155,7 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True):
|
||||
|
||||
for exact_match, match_func in exact_matches:
|
||||
# For each brand, attempt to fingerprint using all FW returned from its queries
|
||||
matches = set()
|
||||
matches: set[str] = set()
|
||||
for brand in VERSIONS.keys():
|
||||
fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand)
|
||||
matches |= match_func(fw_versions_dict, match_brand=brand, log=log)
|
||||
@@ -164,12 +171,12 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True):
|
||||
return True, set()
|
||||
|
||||
|
||||
def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
|
||||
def get_present_ecus(logcan, sendcan, num_pandas: int = 1) -> set[EcuAddrBusType]:
|
||||
params = Params()
|
||||
# queries are split by OBD multiplexing mode
|
||||
queries: Dict[bool, List[List[EcuAddrBusType]]] = {True: [], False: []}
|
||||
parallel_queries: Dict[bool, List[EcuAddrBusType]] = {True: [], False: []}
|
||||
responses = set()
|
||||
queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []}
|
||||
parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []}
|
||||
responses: set[EcuAddrBusType] = set()
|
||||
|
||||
for brand, config, r in REQUESTS:
|
||||
# Skip query if no panda available
|
||||
@@ -203,7 +210,7 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
|
||||
return ecu_responses
|
||||
|
||||
|
||||
def get_brand_ecu_matches(ecu_rx_addrs: Set[EcuAddrBusType]) -> dict[str, set[AddrType]]:
|
||||
def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, set[AddrType]]:
|
||||
"""Returns dictionary of brands and matches with ECUs in their FW versions"""
|
||||
|
||||
brand_addrs = {brand: {(addr, subaddr) for _, addr, subaddr in config.get_all_ecus(VERSIONS[brand])} for
|
||||
@@ -230,8 +237,8 @@ def set_obd_multiplexing(params: Params, obd_multiplexing: bool):
|
||||
cloudlog.warning("OBD multiplexing set successfully")
|
||||
|
||||
|
||||
def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \
|
||||
List[capnp.lib.capnp._DynamicStructBuilder]:
|
||||
def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1,
|
||||
debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
|
||||
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found"""
|
||||
|
||||
all_car_fw = []
|
||||
@@ -253,8 +260,8 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand
|
||||
return all_car_fw
|
||||
|
||||
|
||||
def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \
|
||||
List[capnp.lib.capnp._DynamicStructBuilder]:
|
||||
def get_fw_versions(logcan, sendcan, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1,
|
||||
debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
|
||||
versions = VERSIONS.copy()
|
||||
params = Params()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user