Compile FrogPilot
This commit is contained in:
@@ -1,45 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import binascii
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
import cereal.messaging as messaging
|
||||
|
||||
|
||||
def can_printer(bus, max_msg, addr, ascii_decode):
|
||||
logcan = messaging.sub_sock('can', addr=addr)
|
||||
|
||||
start = time.monotonic()
|
||||
lp = time.monotonic()
|
||||
msgs = defaultdict(list)
|
||||
while 1:
|
||||
can_recv = messaging.drain_sock(logcan, wait_for_one=True)
|
||||
for x in can_recv:
|
||||
for y in x.can:
|
||||
if y.src == bus:
|
||||
msgs[y.address].append(y.dat)
|
||||
|
||||
if time.monotonic() - lp > 0.1:
|
||||
dd = chr(27) + "[2J"
|
||||
dd += f"{time.monotonic() - start:5.2f}\n"
|
||||
for addr in sorted(msgs.keys()):
|
||||
a = f"\"{msgs[addr][-1].decode('ascii', 'backslashreplace')}\"" if ascii_decode else ""
|
||||
x = binascii.hexlify(msgs[addr][-1]).decode('ascii')
|
||||
freq = len(msgs[addr]) / (time.monotonic() - start)
|
||||
if max_msg is None or addr < max_msg:
|
||||
dd += "%04X(%4d)(%6d)(%3dHz) %s %s\n" % (addr, addr, len(msgs[addr]), freq, x.ljust(20), a)
|
||||
print(dd)
|
||||
lp = time.monotonic()
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="simple CAN data viewer",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
|
||||
parser.add_argument("--bus", type=int, help="CAN bus to print out", default=0)
|
||||
parser.add_argument("--max_msg", type=int, help="max addr")
|
||||
parser.add_argument("--ascii", action='store_true', help="decode as ascii")
|
||||
parser.add_argument("--addr", default="127.0.0.1")
|
||||
|
||||
args = parser.parse_args()
|
||||
can_printer(args.bus, args.max_msg, args.addr, args.ascii)
|
||||
@@ -1,50 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import numpy as np
|
||||
import time
|
||||
from collections import defaultdict, deque
|
||||
from typing import DefaultDict, Deque, MutableSequence
|
||||
|
||||
import cereal.messaging as messaging
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
context = messaging.Context()
|
||||
poller = messaging.Poller()
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("socket", type=str, nargs='*', help="socket name")
|
||||
args = parser.parse_args()
|
||||
|
||||
socket_names = args.socket
|
||||
sockets = {}
|
||||
|
||||
rcv_times: DefaultDict[str, MutableSequence[float]] = defaultdict(lambda: deque(maxlen=100))
|
||||
valids: DefaultDict[str, Deque[bool]] = defaultdict(lambda: deque(maxlen=100))
|
||||
|
||||
t = time.monotonic()
|
||||
for name in socket_names:
|
||||
sock = messaging.sub_sock(name, poller=poller)
|
||||
sockets[sock] = name
|
||||
|
||||
prev_print = t
|
||||
while True:
|
||||
for socket in poller.poll(100):
|
||||
msg = messaging.recv_one(socket)
|
||||
if msg is None:
|
||||
continue
|
||||
|
||||
name = msg.which()
|
||||
|
||||
t = time.monotonic()
|
||||
rcv_times[name].append(msg.logMonoTime / 1e9)
|
||||
valids[name].append(msg.valid)
|
||||
|
||||
if t - prev_print > 1:
|
||||
print()
|
||||
for name in socket_names:
|
||||
dts = np.diff(rcv_times[name])
|
||||
mean = np.mean(dts)
|
||||
print(f"{name}: Freq {1.0 / mean:.2f} Hz, Min {np.min(dts) / mean * 100:.2f}%, Max {np.max(dts) / mean * 100:.2f}%, valid ", all(valids[name]))
|
||||
|
||||
prev_print = t
|
||||
@@ -1,63 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import argparse
|
||||
import json
|
||||
import codecs
|
||||
|
||||
from hexdump import hexdump
|
||||
from cereal import log
|
||||
from cereal.services import SERVICE_LIST
|
||||
from openpilot.tools.lib.live_logreader import raw_live_logreader
|
||||
|
||||
|
||||
codecs.register_error("strict", codecs.backslashreplace_errors)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser(description='Dump communication sockets. See cereal/services.py for a complete list of available sockets.')
|
||||
parser.add_argument('--pipe', action='store_true')
|
||||
parser.add_argument('--raw', action='store_true')
|
||||
parser.add_argument('--json', action='store_true')
|
||||
parser.add_argument('--dump-json', action='store_true')
|
||||
parser.add_argument('--no-print', action='store_true')
|
||||
parser.add_argument('--addr', default='127.0.0.1')
|
||||
parser.add_argument('--values', help='values to monitor (instead of entire event)')
|
||||
parser.add_argument("socket", type=str, nargs='*', default=list(SERVICE_LIST.keys()), help="socket names to dump. defaults to all services defined in cereal")
|
||||
args = parser.parse_args()
|
||||
|
||||
lr = raw_live_logreader(args.socket, args.addr)
|
||||
|
||||
values = None
|
||||
if args.values:
|
||||
values = [s.strip().split(".") for s in args.values.split(",")]
|
||||
|
||||
for msg in lr:
|
||||
with log.Event.from_bytes(msg) as evt:
|
||||
if not args.no_print:
|
||||
if args.pipe:
|
||||
sys.stdout.write(str(msg))
|
||||
sys.stdout.flush()
|
||||
elif args.raw:
|
||||
hexdump(msg)
|
||||
elif args.json:
|
||||
print(json.loads(msg))
|
||||
elif args.dump_json:
|
||||
print(json.dumps(evt.to_dict()))
|
||||
elif values:
|
||||
print(f"logMonotime = {evt.logMonoTime}")
|
||||
for value in values:
|
||||
if hasattr(evt, value[0]):
|
||||
item = evt
|
||||
for key in value:
|
||||
item = getattr(item, key)
|
||||
print(f"{'.'.join(value)} = {item}")
|
||||
print("")
|
||||
else:
|
||||
try:
|
||||
print(evt)
|
||||
except UnicodeDecodeError:
|
||||
w = evt.which()
|
||||
s = f"( logMonoTime {evt.logMonoTime} \n {w} = "
|
||||
s += str(evt.__getattr__(w))
|
||||
s += f"\n valid = {evt.valid} )"
|
||||
print(s)
|
||||
@@ -1,79 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import json
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from openpilot.tools.lib.logreader import LogReader
|
||||
|
||||
LEVELS = {
|
||||
"DEBUG": 10,
|
||||
"INFO": 20,
|
||||
"WARNING": 30,
|
||||
"ERROR": 40,
|
||||
"CRITICAL": 50,
|
||||
}
|
||||
|
||||
ANDROID_LOG_SOURCE = {
|
||||
0: "MAIN",
|
||||
1: "RADIO",
|
||||
2: "EVENTS",
|
||||
3: "SYSTEM",
|
||||
4: "CRASH",
|
||||
5: "KERNEL",
|
||||
}
|
||||
|
||||
|
||||
def print_logmessage(t, msg, min_level):
|
||||
try:
|
||||
log = json.loads(msg)
|
||||
if log['levelnum'] >= min_level:
|
||||
print(f"[{t / 1e9:.6f}] {log['filename']}:{log.get('lineno', '')} - {log.get('funcname', '')}: {log['msg']}")
|
||||
if 'exc_info' in log:
|
||||
print(log['exc_info'])
|
||||
except json.decoder.JSONDecodeError:
|
||||
print(f"[{t / 1e9:.6f}] decode error: {msg}")
|
||||
|
||||
|
||||
def print_androidlog(t, msg):
|
||||
source = ANDROID_LOG_SOURCE[msg.id]
|
||||
try:
|
||||
m = json.loads(msg.message)['MESSAGE']
|
||||
except Exception:
|
||||
m = msg.message
|
||||
|
||||
print(f"[{t / 1e9:.6f}] {source} {msg.pid} {msg.tag} - {m}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--absolute', action='store_true')
|
||||
parser.add_argument('--level', default='DEBUG')
|
||||
parser.add_argument('--addr', default='127.0.0.1')
|
||||
parser.add_argument("route", type=str, nargs='*', help="route name + segment number for offline usage")
|
||||
args = parser.parse_args()
|
||||
|
||||
min_level = LEVELS[args.level]
|
||||
|
||||
if args.route:
|
||||
st = None if not args.absolute else 0
|
||||
for route in args.route:
|
||||
lr = LogReader(route, sort_by_time=True)
|
||||
for m in lr:
|
||||
if st is None:
|
||||
st = m.logMonoTime
|
||||
if m.which() == 'logMessage':
|
||||
print_logmessage(m.logMonoTime-st, m.logMessage, min_level)
|
||||
elif m.which() == 'errorLogMessage':
|
||||
print_logmessage(m.logMonoTime-st, m.errorLogMessage, min_level)
|
||||
elif m.which() == 'androidLog':
|
||||
print_androidlog(m.logMonoTime-st, m.androidLog)
|
||||
else:
|
||||
sm = messaging.SubMaster(['logMessage', 'androidLog'], addr=args.addr)
|
||||
while True:
|
||||
sm.update()
|
||||
|
||||
if sm.updated['logMessage']:
|
||||
print_logmessage(sm.logMonoTime['logMessage'], sm['logMessage'], min_level)
|
||||
|
||||
if sm.updated['androidLog']:
|
||||
print_androidlog(sm.logMonoTime['androidLog'], sm['androidLog'])
|
||||
@@ -1,81 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import jinja2
|
||||
import os
|
||||
|
||||
from cereal import car
|
||||
from openpilot.common.basedir import BASEDIR
|
||||
from openpilot.selfdrive.car.interfaces import get_interface_attr
|
||||
|
||||
Ecu = car.CarParams.Ecu
|
||||
|
||||
CARS = get_interface_attr('CAR')
|
||||
FW_VERSIONS = get_interface_attr('FW_VERSIONS')
|
||||
FINGERPRINTS = get_interface_attr('FINGERPRINTS')
|
||||
ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()}
|
||||
|
||||
FINGERPRINTS_PY_TEMPLATE = jinja2.Template("""
|
||||
{%- if FINGERPRINTS[brand] %}
|
||||
# ruff: noqa: E501
|
||||
{% endif %}
|
||||
{% if FW_VERSIONS[brand] %}
|
||||
from cereal import car
|
||||
{% endif %}
|
||||
from openpilot.selfdrive.car.{{brand}}.values import CAR
|
||||
{% if FW_VERSIONS[brand] %}
|
||||
|
||||
Ecu = car.CarParams.Ecu
|
||||
{% endif %}
|
||||
{% if comments +%}
|
||||
{{ comments | join() }}
|
||||
{% endif %}
|
||||
{% if FINGERPRINTS[brand] %}
|
||||
|
||||
FINGERPRINTS = {
|
||||
{% for car, fingerprints in FINGERPRINTS[brand].items() %}
|
||||
CAR.{{car.name}}: [{
|
||||
{% for fingerprint in fingerprints %}
|
||||
{% if not loop.first %}
|
||||
{{ "{" }}
|
||||
{% endif %}
|
||||
{% for key, value in fingerprint.items() %}{{key}}: {{value}}{% if not loop.last %}, {% endif %}{% endfor %}
|
||||
|
||||
}{% if loop.last %}]{% endif %},
|
||||
{% endfor %}
|
||||
{% endfor %}
|
||||
}
|
||||
{% endif %}
|
||||
|
||||
FW_VERSIONS{% if not FW_VERSIONS[brand] %}: dict[str, dict[tuple, list[bytes]]]{% endif %} = {
|
||||
{% for car, _ in FW_VERSIONS[brand].items() %}
|
||||
CAR.{{car.name}}: {
|
||||
{% for key, fw_versions in FW_VERSIONS[brand][car].items() %}
|
||||
(Ecu.{{ECU_NAME[key[0]]}}, 0x{{"%0x" | format(key[1] | int)}}, \
|
||||
{% if key[2] %}0x{{"%0x" | format(key[2] | int)}}{% else %}{{key[2]}}{% endif %}): [
|
||||
{% for fw_version in (fw_versions + extra_fw_versions.get(car, {}).get(key, [])) | unique | sort %}
|
||||
{{fw_version}},
|
||||
{% endfor %}
|
||||
],
|
||||
{% endfor %}
|
||||
},
|
||||
{% endfor %}
|
||||
}
|
||||
|
||||
""", trim_blocks=True)
|
||||
|
||||
|
||||
def format_brand_fw_versions(brand, extra_fw_versions: None | dict[str, dict[tuple, list[bytes]]] = None):
|
||||
extra_fw_versions = extra_fw_versions or {}
|
||||
|
||||
fingerprints_file = os.path.join(BASEDIR, f"selfdrive/car/{brand}/fingerprints.py")
|
||||
with open(fingerprints_file, "r") as f:
|
||||
comments = [line for line in f.readlines() if line.startswith("#") and "noqa" not in line]
|
||||
|
||||
with open(fingerprints_file, "w") as f:
|
||||
f.write(FINGERPRINTS_PY_TEMPLATE.render(brand=brand, comments=comments, ECU_NAME=ECU_NAME,
|
||||
FINGERPRINTS=FINGERPRINTS, FW_VERSIONS=FW_VERSIONS,
|
||||
extra_fw_versions=extra_fw_versions))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
for brand in FW_VERSIONS.keys():
|
||||
format_brand_fw_versions(brand)
|
||||
@@ -1,31 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# simple script to get a vehicle fingerprint.
|
||||
|
||||
# Instructions:
|
||||
# - connect to a Panda
|
||||
# - run selfdrive/boardd/boardd
|
||||
# - launching this script
|
||||
# Note: it's very important that the car is in stock mode, in order to collect a complete fingerprint
|
||||
# - since some messages are published at low frequency, keep this script running for at least 30s,
|
||||
# until all messages are received at least once
|
||||
|
||||
import cereal.messaging as messaging
|
||||
|
||||
logcan = messaging.sub_sock('can')
|
||||
msgs = {}
|
||||
while True:
|
||||
lc = messaging.recv_sock(logcan, True)
|
||||
if lc is None:
|
||||
continue
|
||||
|
||||
for c in lc.can:
|
||||
# read also msgs sent by EON on CAN bus 0x80 and filter out the
|
||||
# addr with more than 11 bits
|
||||
if c.src % 0x80 == 0 and c.address < 0x800 and c.address not in (0x7df, 0x7e0, 0x7e8):
|
||||
msgs[c.address] = len(c.dat)
|
||||
|
||||
fingerprint = ', '.join("%d: %d" % v for v in sorted(msgs.items()))
|
||||
|
||||
print(f"number of messages {len(msgs)}:")
|
||||
print(f"fingerprint {fingerprint}")
|
||||
@@ -1,128 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Some Hyundai radars can be reconfigured to output (debug) radar points on bus 1.
|
||||
Reconfiguration is done over UDS by reading/writing to 0x0142 using the Read/Write Data By Identifier
|
||||
endpoints (0x22 & 0x2E). This script checks your radar firmware version against a list of known
|
||||
firmware versions. If you want to try on a new radar make sure to note the default config value
|
||||
in case it's different from the other radars and you need to revert the changes.
|
||||
|
||||
After changing the config the car should not show any faults when openpilot is not running.
|
||||
These config changes are persistent across car reboots. You need to run this script again
|
||||
to go back to the default values.
|
||||
|
||||
USE AT YOUR OWN RISK! Safety features, like AEB and FCW, might be affected by these changes."""
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
from typing import NamedTuple
|
||||
from subprocess import check_output, CalledProcessError
|
||||
|
||||
from panda.python import Panda
|
||||
from panda.python.uds import UdsClient, SESSION_TYPE, DATA_IDENTIFIER_TYPE
|
||||
|
||||
class ConfigValues(NamedTuple):
|
||||
default_config: bytes
|
||||
tracks_enabled: bytes
|
||||
|
||||
# If your radar supports changing data identifier 0x0142 as well make a PR to
|
||||
# this file to add your firmware version. Make sure to post a drive as proof!
|
||||
# NOTE: these firmware versions do not match what openpilot uses
|
||||
# because this script uses a different diagnostic session type
|
||||
SUPPORTED_FW_VERSIONS = {
|
||||
# 2020 SONATA
|
||||
b"DN8_ SCC FHCUP 1.00 1.00 99110-L0000\x19\x08)\x15T ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
b"DN8_ SCC F-CUP 1.00 1.00 99110-L0000\x19\x08)\x15T ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
# 2021 SONATA HYBRID
|
||||
b"DNhe SCC FHCUP 1.00 1.02 99110-L5000 \x01#\x15# ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
# 2020 PALISADE
|
||||
b"LX2_ SCC FHCUP 1.00 1.04 99110-S8100\x19\x05\x02\x16V ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
# 2022 PALISADE
|
||||
b"LX2_ SCC FHCUP 1.00 1.00 99110-S8110!\x04\x05\x17\x01 ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
# 2020 SANTA FE
|
||||
b"TM__ SCC F-CUP 1.00 1.03 99110-S2000\x19\x050\x13' ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
# 2020 GENESIS G70
|
||||
b'IK__ SCC F-CUP 1.00 1.02 96400-G9100\x18\x07\x06\x17\x12 ': ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
# 2019 SANTA FE
|
||||
b"TM__ SCC F-CUP 1.00 1.00 99110-S1210\x19\x01%\x168 ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
b"TM__ SCC F-CUP 1.00 1.02 99110-S2000\x18\x07\x08\x18W ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
# 2021 K5 HEV
|
||||
b"DLhe SCC FHCUP 1.00 1.02 99110-L7000 \x01 \x102 ": ConfigValues(
|
||||
default_config=b"\x00\x00\x00\x01\x00\x00",
|
||||
tracks_enabled=b"\x00\x00\x00\x01\x00\x01"),
|
||||
}
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='configure radar to output points (or reset to default)')
|
||||
parser.add_argument('--default', action="store_true", default=False, help='reset to default configuration (default: false)')
|
||||
parser.add_argument('--debug', action="store_true", default=False, help='enable debug output (default: false)')
|
||||
parser.add_argument('--bus', type=int, default=0, help='can bus to use (default: 0)')
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
check_output(["pidof", "boardd"])
|
||||
print("boardd is running, please kill openpilot before running this script! (aborted)")
|
||||
sys.exit(1)
|
||||
except CalledProcessError as e:
|
||||
if e.returncode != 1: # 1 == no process found (boardd not running)
|
||||
raise e
|
||||
|
||||
confirm = input("power on the vehicle keeping the engine off (press start button twice) then type OK to continue: ").upper().strip()
|
||||
if confirm != "OK":
|
||||
print("\nyou didn't type 'OK! (aborted)")
|
||||
sys.exit(0)
|
||||
|
||||
panda = Panda()
|
||||
panda.set_safety_mode(Panda.SAFETY_ELM327)
|
||||
uds_client = UdsClient(panda, 0x7D0, bus=args.bus, debug=args.debug)
|
||||
|
||||
print("\n[START DIAGNOSTIC SESSION]")
|
||||
session_type : SESSION_TYPE = 0x07 # type: ignore
|
||||
uds_client.diagnostic_session_control(session_type)
|
||||
|
||||
print("[HARDWARE/SOFTWARE VERSION]")
|
||||
fw_version_data_id : DATA_IDENTIFIER_TYPE = 0xf100 # type: ignore
|
||||
fw_version = uds_client.read_data_by_identifier(fw_version_data_id)
|
||||
print(fw_version)
|
||||
if fw_version not in SUPPORTED_FW_VERSIONS.keys():
|
||||
print("radar not supported! (aborted)")
|
||||
sys.exit(1)
|
||||
|
||||
print("[GET CONFIGURATION]")
|
||||
config_data_id : DATA_IDENTIFIER_TYPE = 0x0142 # type: ignore
|
||||
current_config = uds_client.read_data_by_identifier(config_data_id)
|
||||
config_values = SUPPORTED_FW_VERSIONS[fw_version]
|
||||
new_config = config_values.default_config if args.default else config_values.tracks_enabled
|
||||
print(f"current config: 0x{current_config.hex()}")
|
||||
if current_config != new_config:
|
||||
print("[CHANGE CONFIGURATION]")
|
||||
print(f"new config: 0x{new_config.hex()}")
|
||||
uds_client.write_data_by_identifier(config_data_id, new_config)
|
||||
if not args.default and current_config != SUPPORTED_FW_VERSIONS[fw_version].default_config:
|
||||
print("\ncurrent config does not match expected default! (aborted)")
|
||||
sys.exit(1)
|
||||
|
||||
print("[DONE]")
|
||||
print("\nrestart your vehicle and ensure there are no faults")
|
||||
if not args.default:
|
||||
print("you can run this script again with --default to go back to the original (factory) settings")
|
||||
else:
|
||||
print("[DONE]")
|
||||
print("\ncurrent config is already the desired configuration")
|
||||
sys.exit(0)
|
||||
@@ -1,33 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import time
|
||||
|
||||
from cereal import car, log, messaging
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.selfdrive.manager.process_config import managed_processes
|
||||
|
||||
if __name__ == "__main__":
|
||||
CP = car.CarParams(notCar=True)
|
||||
Params().put("CarParams", CP.to_bytes())
|
||||
|
||||
procs = ['camerad', 'ui', 'modeld', 'calibrationd']
|
||||
for p in procs:
|
||||
managed_processes[p].start()
|
||||
|
||||
pm = messaging.PubMaster(['controlsState', 'deviceState', 'pandaStates', 'carParams'])
|
||||
|
||||
msgs = {s: messaging.new_message(s) for s in ['controlsState', 'deviceState', 'carParams']}
|
||||
msgs['deviceState'].deviceState.started = True
|
||||
msgs['carParams'].carParams.openpilotLongitudinalControl = True
|
||||
|
||||
msgs['pandaStates'] = messaging.new_message('pandaStates', 1)
|
||||
msgs['pandaStates'].pandaStates[0].ignitionLine = True
|
||||
msgs['pandaStates'].pandaStates[0].pandaType = log.PandaState.PandaType.uno
|
||||
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1 / 100) # continually send, rate doesn't matter
|
||||
for s in msgs:
|
||||
pm.send(s, msgs[s])
|
||||
except KeyboardInterrupt:
|
||||
for p in procs:
|
||||
managed_processes[p].stop()
|
||||
@@ -1,157 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import struct
|
||||
from enum import IntEnum
|
||||
from panda import Panda
|
||||
from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseError, SESSION_TYPE,\
|
||||
DATA_IDENTIFIER_TYPE, ACCESS_TYPE
|
||||
|
||||
# TODO: extend UDS library to allow custom/vendor-defined data identifiers without ignoring type checks
|
||||
class VOLKSWAGEN_DATA_IDENTIFIER_TYPE(IntEnum):
|
||||
CODING = 0x0600
|
||||
|
||||
# TODO: extend UDS library security_access() to take an access level offset per ISO 14229-1:2020 10.4 and remove this
|
||||
class ACCESS_TYPE_LEVEL_1(IntEnum):
|
||||
REQUEST_SEED = ACCESS_TYPE.REQUEST_SEED + 2
|
||||
SEND_KEY = ACCESS_TYPE.SEND_KEY + 2
|
||||
|
||||
MQB_EPS_CAN_ADDR = 0x712
|
||||
RX_OFFSET = 0x6a
|
||||
|
||||
if __name__ == "__main__":
|
||||
desc_text = "Shows Volkswagen EPS software and coding info, and enables or disables Heading Control Assist " + \
|
||||
"(Lane Assist). Useful for enabling HCA on cars without factory Lane Assist that want to use " + \
|
||||
"openpilot integrated at the CAN gateway (J533)."
|
||||
epilog_text = "This tool is meant to run directly on a vehicle-installed comma three, with the " + \
|
||||
"openpilot/tmux processes stopped. It should also work on a separate PC with a USB-attached comma " + \
|
||||
"panda. Vehicle ignition must be on. Recommend engine not be running when making changes. Must " + \
|
||||
"turn ignition off and on again for any changes to take effect."
|
||||
parser = argparse.ArgumentParser(description=desc_text, epilog=epilog_text)
|
||||
parser.add_argument("--debug", action="store_true", help="enable ISO-TP/UDS stack debugging output")
|
||||
parser.add_argument("action", choices={"show", "enable", "disable"}, help="show or modify current EPS HCA config")
|
||||
args = parser.parse_args()
|
||||
|
||||
panda = Panda()
|
||||
panda.set_safety_mode(Panda.SAFETY_ELM327)
|
||||
bus = 1 if panda.has_obd() else 0
|
||||
uds_client = UdsClient(panda, MQB_EPS_CAN_ADDR, MQB_EPS_CAN_ADDR + RX_OFFSET, bus, timeout=0.2, debug=args.debug)
|
||||
|
||||
try:
|
||||
uds_client.diagnostic_session_control(SESSION_TYPE.EXTENDED_DIAGNOSTIC)
|
||||
except MessageTimeoutError:
|
||||
print("Timeout opening session with EPS")
|
||||
quit()
|
||||
|
||||
odx_file, current_coding = None, None
|
||||
try:
|
||||
hw_pn = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.VEHICLE_MANUFACTURER_ECU_HARDWARE_NUMBER).decode("utf-8")
|
||||
sw_pn = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.VEHICLE_MANUFACTURER_SPARE_PART_NUMBER).decode("utf-8")
|
||||
sw_ver = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.VEHICLE_MANUFACTURER_ECU_SOFTWARE_VERSION_NUMBER).decode("utf-8")
|
||||
component = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.SYSTEM_NAME_OR_ENGINE_TYPE).decode("utf-8")
|
||||
odx_file = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.ODX_FILE).decode("utf-8").rstrip('\x00')
|
||||
current_coding = uds_client.read_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING) # type: ignore
|
||||
coding_text = current_coding.hex()
|
||||
|
||||
print("\nEPS diagnostic data\n")
|
||||
print(f" Part No HW: {hw_pn}")
|
||||
print(f" Part No SW: {sw_pn}")
|
||||
print(f" SW Version: {sw_ver}")
|
||||
print(f" Component: {component}")
|
||||
print(f" Coding: {coding_text}")
|
||||
print(f" ASAM Dataset: {odx_file}")
|
||||
except NegativeResponseError:
|
||||
print("Error fetching data from EPS")
|
||||
quit()
|
||||
except MessageTimeoutError:
|
||||
print("Timeout fetching data from EPS")
|
||||
quit()
|
||||
|
||||
coding_variant, current_coding_array, coding_byte, coding_bit = None, None, 0, 0
|
||||
coding_length = len(current_coding)
|
||||
|
||||
# EPS_MQB_ZFLS
|
||||
if odx_file in ("EV_SteerAssisMQB", "EV_SteerAssisMNB"):
|
||||
coding_variant = "ZFLS"
|
||||
coding_byte = 0
|
||||
coding_bit = 4
|
||||
|
||||
# MQB_PP_APA, MQB_VWBS_GEN2
|
||||
elif odx_file in ("EV_SteerAssisVWBSMQBA", "EV_SteerAssisVWBSMQBGen2"):
|
||||
coding_variant = "APA"
|
||||
coding_byte = 3
|
||||
coding_bit = 0
|
||||
|
||||
else:
|
||||
print("Configuration changes not yet supported on this EPS!")
|
||||
quit()
|
||||
|
||||
current_coding_array = struct.unpack(f"!{coding_length}B", current_coding)
|
||||
hca_enabled = (current_coding_array[coding_byte] & (1 << coding_bit) != 0)
|
||||
hca_text = ("DISABLED", "ENABLED")[hca_enabled]
|
||||
print(f" Lane Assist: {hca_text}")
|
||||
|
||||
try:
|
||||
params = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION).decode("utf-8")
|
||||
param_version_system_params = params[1:3]
|
||||
param_vehicle_type = params[3:5]
|
||||
param_index_char_curve = params[5:7]
|
||||
param_version_char_values = params[7:9]
|
||||
param_version_memory_map = params[9:11]
|
||||
print("\nEPS parameterization (per-vehicle calibration) data\n")
|
||||
print(f" Version of system parameters: {param_version_system_params}")
|
||||
print(f" Vehicle type: {param_vehicle_type}")
|
||||
print(f" Index of characteristic curve: {param_index_char_curve}")
|
||||
print(f" Version of characteristic values: {param_version_char_values}")
|
||||
print(f" Version of memory map: {param_version_memory_map}")
|
||||
except (NegativeResponseError, MessageTimeoutError):
|
||||
print("Error fetching parameterization data from EPS!")
|
||||
quit()
|
||||
|
||||
if args.action in ["enable", "disable"]:
|
||||
print("\nAttempting configuration update")
|
||||
|
||||
assert(coding_variant in ("ZFLS", "APA"))
|
||||
# ZFLS EPS config coding length can be anywhere from 1 to 4 bytes, but the
|
||||
# bit we care about is always in the same place in the first byte
|
||||
if args.action == "enable":
|
||||
new_byte = current_coding_array[coding_byte] | (1 << coding_bit)
|
||||
else:
|
||||
new_byte = current_coding_array[coding_byte] & ~(1 << coding_bit)
|
||||
new_coding = current_coding[0:coding_byte] + new_byte.to_bytes(1, "little") + current_coding[coding_byte+1:]
|
||||
|
||||
try:
|
||||
seed = uds_client.security_access(ACCESS_TYPE_LEVEL_1.REQUEST_SEED) # type: ignore
|
||||
key = struct.unpack("!I", seed)[0] + 28183 # yeah, it's like that
|
||||
uds_client.security_access(ACCESS_TYPE_LEVEL_1.SEND_KEY, struct.pack("!I", key)) # type: ignore
|
||||
except (NegativeResponseError, MessageTimeoutError):
|
||||
print("Security access failed!")
|
||||
print("Open the hood and retry (disables the \"diagnostic firewall\" on newer vehicles)")
|
||||
quit()
|
||||
|
||||
try:
|
||||
# Programming date and tester number must be written before making
|
||||
# a change, or write to CODING will fail with request sequence error
|
||||
# Encoding on tester is unclear, it contains the workshop code in the
|
||||
# last two bytes, but not the VZ/importer or tester serial number
|
||||
# Can't seem to read it back, but we can read the calibration tester,
|
||||
# so fib a little and say that same tester did the programming
|
||||
# TODO: encode the actual current date
|
||||
prog_date = b'\x22\x02\x08'
|
||||
uds_client.write_data_by_identifier(DATA_IDENTIFIER_TYPE.PROGRAMMING_DATE, prog_date)
|
||||
tester_num = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.CALIBRATION_REPAIR_SHOP_CODE_OR_CALIBRATION_EQUIPMENT_SERIAL_NUMBER)
|
||||
uds_client.write_data_by_identifier(DATA_IDENTIFIER_TYPE.REPAIR_SHOP_CODE_OR_TESTER_SERIAL_NUMBER, tester_num)
|
||||
uds_client.write_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING, new_coding) # type: ignore
|
||||
except (NegativeResponseError, MessageTimeoutError):
|
||||
print("Writing new configuration failed!")
|
||||
print("Make sure the comma processes are stopped: tmux kill-session -t comma")
|
||||
quit()
|
||||
|
||||
try:
|
||||
# Read back result just to make 100% sure everything worked
|
||||
current_coding_text = uds_client.read_data_by_identifier(VOLKSWAGEN_DATA_IDENTIFIER_TYPE.CODING).hex() # type: ignore
|
||||
print(f" New coding: {current_coding_text}")
|
||||
except (NegativeResponseError, MessageTimeoutError):
|
||||
print("Reading back updated coding failed!")
|
||||
quit()
|
||||
print("EPS configuration successfully updated")
|
||||
Reference in New Issue
Block a user