openpilot v0.9.6 release

date: 2024-02-21T23:02:42
master commit: 0b4d08fab8e35a264bc7383e878538f8083c33e5
This commit is contained in:
FrogAi
2024-02-27 16:34:45 -07:00
commit 2901597132
1940 changed files with 647891 additions and 0 deletions

946
panda/python/__init__.py Normal file
View File

@@ -0,0 +1,946 @@
# python library to interface with panda
import os
import sys
import time
import usb1
import struct
import hashlib
import binascii
import datetime
import logging
from functools import wraps, partial
from typing import Optional
from itertools import accumulate
from .base import BaseHandle
from .constants import FW_PATH, McuType
from .dfu import PandaDFU
from .isotp import isotp_send, isotp_recv
from .spi import PandaSpiHandle, PandaSpiException, PandaProtocolMismatch
from .usb import PandaUsbHandle
__version__ = '0.0.10'
# setup logging
LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
logging.basicConfig(level=LOGLEVEL, format='%(message)s')
CANPACKET_HEAD_SIZE = 0x6
DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64]
LEN_TO_DLC = {length: dlc for (dlc, length) in enumerate(DLC_TO_LEN)}
PANDA_BUS_CNT = 4
def calculate_checksum(data):
res = 0
for b in data:
res ^= b
return res
def pack_can_buffer(arr):
snds = [b'']
for address, _, dat, bus in arr:
assert len(dat) in LEN_TO_DLC
#logging.debug(" W 0x%x: 0x%s", address, dat.hex())
extended = 1 if address >= 0x800 else 0
data_len_code = LEN_TO_DLC[len(dat)]
header = bytearray(CANPACKET_HEAD_SIZE)
word_4b = address << 3 | extended << 2
header[0] = (data_len_code << 4) | (bus << 1)
header[1] = word_4b & 0xFF
header[2] = (word_4b >> 8) & 0xFF
header[3] = (word_4b >> 16) & 0xFF
header[4] = (word_4b >> 24) & 0xFF
header[5] = calculate_checksum(header[:5] + dat)
snds[-1] += header + dat
if len(snds[-1]) > 256: # Limit chunks to 256 bytes
snds.append(b'')
return snds
def unpack_can_buffer(dat):
ret = []
while len(dat) >= CANPACKET_HEAD_SIZE:
data_len = DLC_TO_LEN[(dat[0]>>4)]
header = dat[:CANPACKET_HEAD_SIZE]
bus = (header[0] >> 1) & 0x7
address = (header[4] << 24 | header[3] << 16 | header[2] << 8 | header[1]) >> 3
if (header[1] >> 1) & 0x1:
# returned
bus += 128
if header[1] & 0x1:
# rejected
bus += 192
# we need more from the next transfer
if data_len > len(dat) - CANPACKET_HEAD_SIZE:
break
assert calculate_checksum(dat[:(CANPACKET_HEAD_SIZE+data_len)]) == 0, "CAN packet checksum incorrect"
data = dat[CANPACKET_HEAD_SIZE:(CANPACKET_HEAD_SIZE+data_len)]
dat = dat[(CANPACKET_HEAD_SIZE+data_len):]
ret.append((address, 0, data, bus))
return (ret, dat)
def ensure_version(desc, lib_field, panda_field, fn):
@wraps(fn)
def wrapper(self, *args, **kwargs):
lib_version = getattr(self, lib_field)
panda_version = getattr(self, panda_field)
if lib_version != panda_version:
raise RuntimeError(f"{desc} packet version mismatch: panda's firmware v{panda_version}, library v{lib_version}. Reflash panda.")
return fn(self, *args, **kwargs)
return wrapper
ensure_can_packet_version = partial(ensure_version, "CAN", "CAN_PACKET_VERSION", "can_version")
ensure_can_health_packet_version = partial(ensure_version, "CAN health", "CAN_HEALTH_PACKET_VERSION", "can_health_version")
ensure_health_packet_version = partial(ensure_version, "health", "HEALTH_PACKET_VERSION", "health_version")
class ALTERNATIVE_EXPERIENCE:
DEFAULT = 0
DISABLE_DISENGAGE_ON_GAS = 1
DISABLE_STOCK_AEB = 2
RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX = 8
ALLOW_AEB = 16
class Panda:
# matches cereal.car.CarParams.SafetyModel
SAFETY_SILENT = 0
SAFETY_HONDA_NIDEC = 1
SAFETY_TOYOTA = 2
SAFETY_ELM327 = 3
SAFETY_GM = 4
SAFETY_HONDA_BOSCH_GIRAFFE = 5
SAFETY_FORD = 6
SAFETY_HYUNDAI = 8
SAFETY_CHRYSLER = 9
SAFETY_TESLA = 10
SAFETY_SUBARU = 11
SAFETY_MAZDA = 13
SAFETY_NISSAN = 14
SAFETY_VOLKSWAGEN_MQB = 15
SAFETY_ALLOUTPUT = 17
SAFETY_GM_ASCM = 18
SAFETY_NOOUTPUT = 19
SAFETY_HONDA_BOSCH = 20
SAFETY_VOLKSWAGEN_PQ = 21
SAFETY_SUBARU_PREGLOBAL = 22
SAFETY_HYUNDAI_LEGACY = 23
SAFETY_HYUNDAI_COMMUNITY = 24
SAFETY_STELLANTIS = 25
SAFETY_FAW = 26
SAFETY_BODY = 27
SAFETY_HYUNDAI_CANFD = 28
SERIAL_DEBUG = 0
SERIAL_ESP = 1
SERIAL_LIN1 = 2
SERIAL_LIN2 = 3
SERIAL_SOM_DEBUG = 4
GMLAN_CAN2 = 1
GMLAN_CAN3 = 2
USB_PIDS = (0xddee, 0xddcc)
REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
HW_TYPE_UNKNOWN = b'\x00'
HW_TYPE_WHITE_PANDA = b'\x01'
HW_TYPE_GREY_PANDA = b'\x02'
HW_TYPE_BLACK_PANDA = b'\x03'
HW_TYPE_PEDAL = b'\x04'
HW_TYPE_UNO = b'\x05'
HW_TYPE_DOS = b'\x06'
HW_TYPE_RED_PANDA = b'\x07'
HW_TYPE_RED_PANDA_V2 = b'\x08'
HW_TYPE_TRES = b'\x09'
HW_TYPE_CUATRO = b'\x0a'
CAN_PACKET_VERSION = 4
HEALTH_PACKET_VERSION = 15
CAN_HEALTH_PACKET_VERSION = 5
HEALTH_STRUCT = struct.Struct("<IIIIIIIIIBBBBBHBBBHfBBHBHHB")
CAN_HEALTH_STRUCT = struct.Struct("<BIBBBBBBBBIIIIIIIHHBBBIIII")
F4_DEVICES = [HW_TYPE_WHITE_PANDA, HW_TYPE_GREY_PANDA, HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS]
H7_DEVICES = [HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2, HW_TYPE_TRES, HW_TYPE_CUATRO]
INTERNAL_DEVICES = (HW_TYPE_UNO, HW_TYPE_DOS, HW_TYPE_TRES, HW_TYPE_CUATRO)
HAS_OBD = (HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS, HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2, HW_TYPE_TRES, HW_TYPE_CUATRO)
MAX_FAN_RPMs = {
HW_TYPE_UNO: 5100,
HW_TYPE_DOS: 6500,
HW_TYPE_TRES: 6600,
HW_TYPE_CUATRO: 6600,
}
HARNESS_STATUS_NC = 0
HARNESS_STATUS_NORMAL = 1
HARNESS_STATUS_FLIPPED = 2
# first byte is for EPS scaling factor
FLAG_TOYOTA_ALT_BRAKE = (1 << 8)
FLAG_TOYOTA_STOCK_LONGITUDINAL = (2 << 8)
FLAG_TOYOTA_LTA = (4 << 8)
FLAG_TOYOTA_GAS_INTERCEPTOR = (8 << 8)
FLAG_HONDA_ALT_BRAKE = 1
FLAG_HONDA_BOSCH_LONG = 2
FLAG_HONDA_NIDEC_ALT = 4
FLAG_HONDA_RADARLESS = 8
FLAG_HONDA_GAS_INTERCEPTOR = 16
FLAG_HYUNDAI_EV_GAS = 1
FLAG_HYUNDAI_HYBRID_GAS = 2
FLAG_HYUNDAI_LONG = 4
FLAG_HYUNDAI_CAMERA_SCC = 8
FLAG_HYUNDAI_CANFD_HDA2 = 16
FLAG_HYUNDAI_CANFD_ALT_BUTTONS = 32
FLAG_HYUNDAI_ALT_LIMITS = 64
FLAG_HYUNDAI_CANFD_HDA2_ALT_STEERING = 128
FLAG_TESLA_POWERTRAIN = 1
FLAG_TESLA_LONG_CONTROL = 2
FLAG_VOLKSWAGEN_LONG_CONTROL = 1
FLAG_CHRYSLER_RAM_DT = 1
FLAG_CHRYSLER_RAM_HD = 2
FLAG_SUBARU_GEN2 = 1
FLAG_SUBARU_LONG = 2
FLAG_SUBARU_PREGLOBAL_REVERSED_DRIVER_TORQUE = 1
FLAG_NISSAN_ALT_EPS_BUS = 1
FLAG_GM_HW_CAM = 1
FLAG_GM_HW_CAM_LONG = 2
FLAG_FORD_LONG_CONTROL = 1
FLAG_FORD_CANFD = 2
def __init__(self, serial: Optional[str] = None, claim: bool = True, disable_checks: bool = True, can_speed_kbps: int = 500):
self._connect_serial = serial
self._disable_checks = disable_checks
self._handle: BaseHandle
self._handle_open = False
self.can_rx_overflow_buffer = b''
self._can_speed_kbps = can_speed_kbps
# connect and set mcu type
self.connect(claim)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def close(self):
if self._handle_open:
self._handle.close()
self._handle_open = False
if self._context is not None:
self._context.close()
def connect(self, claim=True, wait=False):
self.close()
self._handle = None
while self._handle is None:
# try USB first, then SPI
self._context, self._handle, serial, self.bootstub, bcd = self.usb_connect(self._connect_serial, claim=claim)
if self._handle is None:
self._context, self._handle, serial, self.bootstub, bcd = self.spi_connect(self._connect_serial)
if not wait:
break
if self._handle is None:
raise Exception("failed to connect to panda")
# Some fallback logic to determine panda and MCU type for old bootstubs,
# since we now support multiple MCUs and need to know which fw to flash.
# Three cases to consider:
# A) oldest bootstubs don't have any way to distinguish
# MCU or panda type
# B) slightly newer (~2 weeks after first C3's built) bootstubs
# have the panda type set in the USB bcdDevice
# C) latest bootstubs also implement the endpoint for panda type
self._bcd_hw_type = None
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
missing_hw_type_endpoint = self.bootstub and ret.startswith(b'\xff\x00\xc1\x3e\xde\xad\xd0\x0d')
if missing_hw_type_endpoint and bcd is not None:
self._bcd_hw_type = bcd
# For case A, we assume F4 MCU type, since all H7 pandas should be case B at worst
self._assume_f4_mcu = (self._bcd_hw_type is None) and missing_hw_type_endpoint
self._serial = serial
self._connect_serial = serial
self._handle_open = True
self._mcu_type = self.get_mcu_type()
self.health_version, self.can_version, self.can_health_version = self.get_packets_versions()
logging.debug("connected")
# disable openpilot's heartbeat checks
if self._disable_checks:
self.set_heartbeat_disabled()
self.set_power_save(0)
# reset comms
self.can_reset_communications()
# set CAN speed
for bus in range(PANDA_BUS_CNT):
self.set_can_speed_kbps(bus, self._can_speed_kbps)
@classmethod
def spi_connect(cls, serial, ignore_version=False):
# get UID to confirm slave is present and up
handle = None
spi_serial = None
bootstub = None
spi_version = None
try:
handle = PandaSpiHandle()
# connect by protcol version
try:
dat = handle.get_protocol_version()
spi_serial = binascii.hexlify(dat[:12]).decode()
pid = dat[13]
if pid not in (0xcc, 0xee):
raise PandaSpiException("invalid bootstub status")
bootstub = pid == 0xee
spi_version = dat[14]
except PandaSpiException:
# fallback, we'll raise a protocol mismatch below
dat = handle.controlRead(Panda.REQUEST_IN, 0xc3, 0, 0, 12, timeout=100)
spi_serial = binascii.hexlify(dat).decode()
bootstub = Panda.flasher_present(handle)
spi_version = 0
except PandaSpiException:
pass
# no connection or wrong panda
if None in (spi_serial, bootstub) or (serial is not None and (spi_serial != serial)):
handle = None
spi_serial = None
bootstub = False
# ensure our protocol version matches the panda
if handle is not None and not ignore_version:
if spi_version != handle.PROTOCOL_VERSION:
err = f"panda protocol mismatch: expected {handle.PROTOCOL_VERSION}, got {spi_version}. reflash panda"
raise PandaProtocolMismatch(err)
return None, handle, spi_serial, bootstub, None
@classmethod
def usb_connect(cls, serial, claim=True):
handle, usb_serial, bootstub, bcd = None, None, None, None
context = usb1.USBContext()
context.open()
try:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0xbbaa and device.getProductID() in cls.USB_PIDS:
try:
this_serial = device.getSerialNumber()
except Exception:
continue
if serial is None or this_serial == serial:
logging.debug("opening device %s %s", this_serial, hex(device.getProductID()))
usb_serial = this_serial
bootstub = (device.getProductID() & 0xF0) == 0xe0
handle = device.open()
if sys.platform not in ("win32", "cygwin", "msys", "darwin"):
handle.setAutoDetachKernelDriver(True)
if claim:
handle.claimInterface(0)
# handle.setInterfaceAltSetting(0, 0) # Issue in USB stack
# bcdDevice wasn't always set to the hw type, ignore if it's the old constant
this_bcd = device.getbcdDevice()
if this_bcd is not None and this_bcd != 0x2300:
bcd = bytearray([this_bcd >> 8, ])
break
except Exception:
logging.exception("USB connect error")
usb_handle = None
if handle is not None:
usb_handle = PandaUsbHandle(handle)
else:
context.close()
return context, usb_handle, usb_serial, bootstub, bcd
@classmethod
def list(cls):
ret = cls.usb_list()
ret += cls.spi_list()
return list(set(ret))
@classmethod
def usb_list(cls):
ret = []
try:
with usb1.USBContext() as context:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0xbbaa and device.getProductID() in cls.USB_PIDS:
try:
serial = device.getSerialNumber()
if len(serial) == 24:
ret.append(serial)
else:
logging.warning(f"found device with panda descriptors but invalid serial: {serial}", RuntimeWarning)
except Exception:
continue
except Exception:
logging.exception("exception while listing pandas")
return ret
@classmethod
def spi_list(cls):
_, _, serial, _, _ = cls.spi_connect(None, ignore_version=True)
if serial is not None:
return [serial, ]
return []
def reset(self, enter_bootstub=False, enter_bootloader=False, reconnect=True):
# no response is expected since it resets right away
timeout = 5000 if isinstance(self._handle, PandaSpiHandle) else 15000
try:
if enter_bootloader:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'', timeout=timeout, expect_disconnect=True)
else:
if enter_bootstub:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'', timeout=timeout, expect_disconnect=True)
else:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'', timeout=timeout, expect_disconnect=True)
except Exception:
pass
if not enter_bootloader and reconnect:
self.reconnect()
@property
def connected(self) -> bool:
return self._handle_open
def reconnect(self):
if self._handle_open:
self.close()
success = False
# wait up to 15 seconds
for _ in range(15*10):
try:
self.connect()
success = True
break
except Exception:
pass
time.sleep(0.1)
if not success:
raise Exception("reconnect failed")
@staticmethod
def flasher_present(handle: BaseHandle) -> bool:
fr = handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc)
return fr[4:8] == b"\xde\xad\xd0\x0d"
@staticmethod
def flash_static(handle, code, mcu_type):
assert mcu_type is not None, "must set valid mcu_type to flash"
# confirm flasher is present
assert Panda.flasher_present(handle)
# determine sectors to erase
apps_sectors_cumsum = accumulate(mcu_type.config.sector_sizes[1:])
last_sector = next((i + 1 for i, v in enumerate(apps_sectors_cumsum) if v > len(code)), -1)
assert last_sector >= 1, "Binary too small? No sector to erase."
assert last_sector < 7, "Binary too large! Risk of overwriting provisioning chunk."
# unlock flash
logging.warning("flash: unlocking")
handle.controlWrite(Panda.REQUEST_IN, 0xb1, 0, 0, b'')
# erase sectors
logging.warning(f"flash: erasing sectors 1 - {last_sector}")
for i in range(1, last_sector + 1):
handle.controlWrite(Panda.REQUEST_IN, 0xb2, i, 0, b'')
# flash over EP2
STEP = 0x10
logging.warning("flash: flashing")
for i in range(0, len(code), STEP):
handle.bulkWrite(2, code[i:i + STEP])
# reset
logging.warning("flash: resetting")
try:
handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'', expect_disconnect=True)
except Exception:
pass
def flash(self, fn=None, code=None, reconnect=True):
if self.up_to_date(fn=fn):
logging.debug("flash: already up to date")
return
if not fn:
fn = os.path.join(FW_PATH, self._mcu_type.config.app_fn)
assert os.path.isfile(fn)
logging.debug("flash: main version is %s", self.get_version())
if not self.bootstub:
self.reset(enter_bootstub=True)
assert(self.bootstub)
if code is None:
with open(fn, "rb") as f:
code = f.read()
# get version
logging.debug("flash: bootstub version is %s", self.get_version())
# do flash
Panda.flash_static(self._handle, code, mcu_type=self._mcu_type)
# reconnect
if reconnect:
self.reconnect()
def recover(self, timeout: Optional[int] = 60, reset: bool = True) -> bool:
dfu_serial = self.get_dfu_serial()
if reset:
self.reset(enter_bootstub=True)
self.reset(enter_bootloader=True)
if not self.wait_for_dfu(dfu_serial, timeout=timeout):
return False
dfu = PandaDFU(dfu_serial)
dfu.recover()
# reflash after recover
self.connect(True, True)
self.flash()
return True
@staticmethod
def wait_for_dfu(dfu_serial: Optional[str], timeout: Optional[int] = None) -> bool:
t_start = time.monotonic()
dfu_list = PandaDFU.list()
while (dfu_serial is None and len(dfu_list) == 0) or (dfu_serial is not None and dfu_serial not in dfu_list):
logging.debug("waiting for DFU...")
time.sleep(0.1)
if timeout is not None and (time.monotonic() - t_start) > timeout:
return False
dfu_list = PandaDFU.list()
return True
@staticmethod
def wait_for_panda(serial: Optional[str], timeout: int) -> bool:
t_start = time.monotonic()
serials = Panda.list()
while (serial is None and len(serials) == 0) or (serial is not None and serial not in serials):
logging.debug("waiting for panda...")
time.sleep(0.1)
if timeout is not None and (time.monotonic() - t_start) > timeout:
return False
serials = Panda.list()
return True
def up_to_date(self, fn=None) -> bool:
current = self.get_signature()
if fn is None:
fn = os.path.join(FW_PATH, self.get_mcu_type().config.app_fn)
expected = Panda.get_signature_from_firmware(fn)
return (current == expected)
def call_control_api(self, msg):
self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'')
# ******************* health *******************
@ensure_health_packet_version
def health(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, self.HEALTH_STRUCT.size)
a = self.HEALTH_STRUCT.unpack(dat)
return {
"uptime": a[0],
"voltage": a[1],
"current": a[2],
"safety_tx_blocked": a[3],
"safety_rx_invalid": a[4],
"tx_buffer_overflow": a[5],
"rx_buffer_overflow": a[6],
"gmlan_send_errs": a[7],
"faults": a[8],
"ignition_line": a[9],
"ignition_can": a[10],
"controls_allowed": a[11],
"car_harness_status": a[12],
"safety_mode": a[13],
"safety_param": a[14],
"fault_status": a[15],
"power_save_enabled": a[16],
"heartbeat_lost": a[17],
"alternative_experience": a[18],
"interrupt_load": a[19],
"fan_power": a[20],
"safety_rx_checks_invalid": a[21],
"spi_checksum_error_count": a[22],
"fan_stall_count": a[23],
"sbu1_voltage_mV": a[24],
"sbu2_voltage_mV": a[25],
"som_reset_triggered": a[26],
}
@ensure_can_health_packet_version
def can_health(self, can_number):
LEC_ERROR_CODE = {
0: "No error",
1: "Stuff error",
2: "Form error",
3: "AckError",
4: "Bit1Error",
5: "Bit0Error",
6: "CRCError",
7: "NoChange",
}
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xc2, int(can_number), 0, self.CAN_HEALTH_STRUCT.size)
a = self.CAN_HEALTH_STRUCT.unpack(dat)
return {
"bus_off": a[0],
"bus_off_cnt": a[1],
"error_warning": a[2],
"error_passive": a[3],
"last_error": LEC_ERROR_CODE[a[4]],
"last_stored_error": LEC_ERROR_CODE[a[5]],
"last_data_error": LEC_ERROR_CODE[a[6]],
"last_data_stored_error": LEC_ERROR_CODE[a[7]],
"receive_error_cnt": a[8],
"transmit_error_cnt": a[9],
"total_error_cnt": a[10],
"total_tx_lost_cnt": a[11],
"total_rx_lost_cnt": a[12],
"total_tx_cnt": a[13],
"total_rx_cnt": a[14],
"total_fwd_cnt": a[15],
"total_tx_checksum_error_cnt": a[16],
"can_speed": a[17],
"can_data_speed": a[18],
"canfd_enabled": a[19],
"brs_enabled": a[20],
"canfd_non_iso": a[21],
"irq0_call_rate": a[22],
"irq1_call_rate": a[23],
"irq2_call_rate": a[24],
"can_core_reset_count": a[25],
}
# ******************* control *******************
def get_version(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xd6, 0, 0, 0x40).decode('utf8')
@staticmethod
def get_signature_from_firmware(fn) -> bytes:
with open(fn, 'rb') as f:
f.seek(-128, 2) # Seek from end of file
return f.read(128)
def get_signature(self) -> bytes:
part_1 = self._handle.controlRead(Panda.REQUEST_IN, 0xd3, 0, 0, 0x40)
part_2 = self._handle.controlRead(Panda.REQUEST_IN, 0xd4, 0, 0, 0x40)
return bytes(part_1 + part_2)
def get_type(self):
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
# old bootstubs don't implement this endpoint, see comment in Panda.device
if self._bcd_hw_type is not None and (ret is None or len(ret) != 1):
ret = self._bcd_hw_type
return ret
# Returns tuple with health packet version and CAN packet/USB packet version
def get_packets_versions(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xdd, 0, 0, 3)
if dat and len(dat) == 3:
a = struct.unpack("BBB", dat)
return (a[0], a[1], a[2])
else:
return (0, 0, 0)
def get_mcu_type(self) -> McuType:
hw_type = self.get_type()
if hw_type in Panda.F4_DEVICES:
return McuType.F4
elif hw_type in Panda.H7_DEVICES:
return McuType.H7
else:
# have to assume F4, see comment in Panda.connect
if self._assume_f4_mcu:
return McuType.F4
raise ValueError(f"unknown HW type: {hw_type}")
def has_obd(self):
return self.get_type() in Panda.HAS_OBD
def is_internal(self):
return self.get_type() in Panda.INTERNAL_DEVICES
def get_serial(self):
"""
Returns the comma-issued dongle ID from our provisioning
"""
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20)
hashsig, calc_hash = dat[0x1c:], hashlib.sha1(dat[0:0x1c]).digest()[0:4]
assert(hashsig == calc_hash)
return [dat[0:0x10].decode("utf8"), dat[0x10:0x10 + 10].decode("utf8")]
def get_usb_serial(self):
"""
Returns the serial number reported from the USB descriptor;
matches the MCU UID
"""
return self._serial
def get_dfu_serial(self):
return PandaDFU.st_serial_to_dfu_serial(self._serial, self._mcu_type)
def get_uid(self):
"""
Returns the UID from the MCU
"""
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xc3, 0, 0, 12)
return binascii.hexlify(dat).decode()
def get_secret(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 1, 0, 0x10)
def get_interrupt_call_rate(self, irqnum):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xc4, int(irqnum), 0, 4)
return struct.unpack("I", dat)[0]
# ******************* configuration *******************
def set_power_save(self, power_save_enabled=0):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe7, int(power_save_enabled), 0, b'')
def enable_deepsleep(self):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xfb, 0, 0, b'')
def set_safety_mode(self, mode=SAFETY_SILENT, param=0):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdc, mode, param, b'')
def set_gmlan(self, bus=2):
# TODO: check panda type
if bus is None:
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 0, 0, b'')
elif bus in (Panda.GMLAN_CAN2, Panda.GMLAN_CAN3):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 1, bus, b'')
def set_obd(self, obd):
# TODO: check panda type
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, int(obd), 0, b'')
def set_can_loopback(self, enable):
# set can loopback mode for all buses
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe5, int(enable), 0, b'')
def set_can_enable(self, bus_num, enable):
# sets the can transceiver enable pin
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf4, int(bus_num), int(enable), b'')
def set_can_speed_kbps(self, bus, speed):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xde, bus, int(speed * 10), b'')
def set_can_data_speed_kbps(self, bus, speed):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf9, bus, int(speed * 10), b'')
def set_canfd_non_iso(self, bus, non_iso):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xfc, bus, int(non_iso), b'')
def set_uart_baud(self, uart, rate):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, int(rate / 300), b'')
def set_uart_parity(self, uart, parity):
# parity, 0=off, 1=even, 2=odd
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe2, uart, parity, b'')
def set_uart_callback(self, uart, install):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe3, uart, int(install), b'')
# ******************* can *******************
# The panda will NAK CAN writes when there is CAN congestion.
# libusb will try to send it again, with a max timeout.
# Timeout is in ms. If set to 0, the timeout is infinite.
CAN_SEND_TIMEOUT_MS = 10
def can_reset_communications(self):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xc0, 0, 0, b'')
@ensure_can_packet_version
def can_send_many(self, arr, timeout=CAN_SEND_TIMEOUT_MS):
snds = pack_can_buffer(arr)
while True:
try:
for tx in snds:
while True:
bs = self._handle.bulkWrite(3, tx, timeout=timeout)
tx = tx[bs:]
if len(tx) == 0:
break
logging.error("CAN: PARTIAL SEND MANY, RETRYING")
break
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
logging.error("CAN: BAD SEND MANY, RETRYING")
def can_send(self, addr, dat, bus, timeout=CAN_SEND_TIMEOUT_MS):
self.can_send_many([[addr, None, dat, bus]], timeout=timeout)
@ensure_can_packet_version
def can_recv(self):
dat = bytearray()
while True:
try:
dat = self._handle.bulkRead(1, 16384) # Max receive batch size + 2 extra reserve frames
break
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
logging.error("CAN: BAD RECV, RETRYING")
time.sleep(0.1)
msgs, self.can_rx_overflow_buffer = unpack_can_buffer(self.can_rx_overflow_buffer + dat)
return msgs
def can_clear(self, bus):
"""Clears all messages from the specified internal CAN ringbuffer as
though it were drained.
Args:
bus (int): can bus number to clear a tx queue, or 0xFFFF to clear the
global can rx queue.
"""
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf1, bus, 0, b'')
# ******************* isotp *******************
def isotp_send(self, addr, dat, bus, recvaddr=None, subaddr=None):
return isotp_send(self, dat, addr, bus, recvaddr, subaddr)
def isotp_recv(self, addr, bus=0, sendaddr=None, subaddr=None):
return isotp_recv(self, addr, bus, sendaddr, subaddr)
# ******************* serial *******************
def serial_read(self, port_number):
ret = []
while 1:
lret = bytes(self._handle.controlRead(Panda.REQUEST_IN, 0xe0, port_number, 0, 0x40))
if len(lret) == 0:
break
ret.append(lret)
return b''.join(ret)
def serial_write(self, port_number, ln):
ret = 0
if isinstance(ln, str):
ln = bytes(ln, 'utf-8')
for i in range(0, len(ln), 0x20):
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i + 0x20])
return ret
def serial_clear(self, port_number):
"""Clears all messages (tx and rx) from the specified internal uart
ringbuffer as though it were drained.
Args:
port_number (int): port number of the uart to clear.
"""
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf2, port_number, 0, b'')
def send_heartbeat(self, engaged=True):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf3, engaged, 0, b'')
# disable heartbeat checks for use outside of openpilot
# sending a heartbeat will reenable the checks
def set_heartbeat_disabled(self):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf8, 0, 0, b'')
# ******************* RTC *******************
def set_datetime(self, dt):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa1, int(dt.year), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa2, int(dt.month), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa3, int(dt.day), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa4, int(dt.isoweekday()), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa5, int(dt.hour), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa6, int(dt.minute), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa7, int(dt.second), 0, b'')
def get_datetime(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xa0, 0, 0, 8)
a = struct.unpack("HBBBBBB", dat)
return datetime.datetime(a[0], a[1], a[2], a[4], a[5], a[6])
# ****************** Timer *****************
def get_microsecond_timer(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xa8, 0, 0, 4)
return struct.unpack("I", dat)[0]
# ******************* IR *******************
def set_ir_power(self, percentage):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xb0, int(percentage), 0, b'')
# ******************* Fan ******************
def set_fan_power(self, percentage):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xb1, int(percentage), 0, b'')
def get_fan_rpm(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xb2, 0, 0, 2)
a = struct.unpack("H", dat)
return a[0]
# ****************** Siren *****************
def set_siren(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf6, int(enabled), 0, b'')
# ****************** Debug *****************
def set_green_led(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf7, int(enabled), 0, b'')
def set_clock_source_period(self, period):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe6, period, 0, b'')
def force_relay_drive(self, intercept_relay_drive, ignition_relay_drive):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xc5, (int(intercept_relay_drive) | int(ignition_relay_drive) << 1), 0, b'')
def read_som_gpio(self) -> bool:
r = self._handle.controlRead(Panda.REQUEST_IN, 0xc6, 0, 0, 1)
return r[0] == 1

61
panda/python/base.py Normal file
View File

@@ -0,0 +1,61 @@
from abc import ABC, abstractmethod
from .constants import McuType
TIMEOUT = int(15 * 1e3) # default timeout, in milliseconds
class BaseHandle(ABC):
"""
A handle to talk to a panda.
Borrows heavily from the libusb1 handle API.
"""
@abstractmethod
def close(self) -> None:
...
@abstractmethod
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = TIMEOUT, expect_disconnect: bool = False):
...
@abstractmethod
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = TIMEOUT) -> bytes:
...
@abstractmethod
def bulkWrite(self, endpoint: int, data: bytes, timeout: int = TIMEOUT) -> int:
...
@abstractmethod
def bulkRead(self, endpoint: int, length: int, timeout: int = TIMEOUT) -> bytes:
...
class BaseSTBootloaderHandle(ABC):
"""
A handle to talk to a panda while it's in the STM32 bootloader.
"""
@abstractmethod
def get_mcu_type(self) -> McuType:
...
@abstractmethod
def close(self) -> None:
...
@abstractmethod
def clear_status(self) -> None:
...
@abstractmethod
def program(self, address: int, dat: bytes) -> None:
...
@abstractmethod
def erase_sector(self, sector: int) -> None:
...
@abstractmethod
def jump(self, address: int) -> None:
...

53
panda/python/canhandle.py Normal file
View File

@@ -0,0 +1,53 @@
import struct
import signal
from .base import BaseHandle
class CanHandle(BaseHandle):
def __init__(self, p, bus):
self.p = p
self.bus = bus
def transact(self, dat):
def _handle_timeout(signum, frame):
# will happen on reset or can error
raise TimeoutError
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(1)
try:
self.p.isotp_send(1, dat, self.bus, recvaddr=2)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(1)
try:
ret = self.p.isotp_recv(2, self.bus, sendaddr=1)
finally:
signal.alarm(0)
return ret
def close(self):
pass
def controlWrite(self, request_type, request, value, index, data, timeout=0, expect_disconnect=False):
# ignore data in reply, panda doesn't use it
return self.controlRead(request_type, request, value, index, 0, timeout)
def controlRead(self, request_type, request, value, index, length, timeout=0):
dat = struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length)
return self.transact(dat)
def bulkWrite(self, endpoint, data, timeout=0):
if len(data) > 0x10:
raise ValueError("Data must not be longer than 0x10")
dat = struct.pack("HH", endpoint, len(data)) + data
return self.transact(dat)
def bulkRead(self, endpoint, length, timeout=0):
dat = struct.pack("HH", endpoint, 0)
return self.transact(dat)

361
panda/python/ccp.py Normal file
View File

@@ -0,0 +1,361 @@
import sys
import time
import struct
from enum import IntEnum, Enum
class COMMAND_CODE(IntEnum):
CONNECT = 0x01
SET_MTA = 0x02
DNLOAD = 0x03
UPLOAD = 0x04
TEST = 0x05
START_STOP = 0x06
DISCONNECT = 0x07
START_STOP_ALL = 0x08
GET_ACTIVE_CAL_PAGE = 0x09
SET_S_STATUS = 0x0C
GET_S_STATUS = 0x0D
BUILD_CHKSUM = 0x0E
SHORT_UP = 0x0F
CLEAR_MEMORY = 0x10
SELECT_CAL_PAGE = 0x11
GET_SEED = 0x12
UNLOCK = 0x13
GET_DAQ_SIZE = 0x14
SET_DAQ_PTR = 0x15
WRITE_DAQ = 0x16
EXCHANGE_ID = 0x17
PROGRAM = 0x18
MOVE = 0x19
GET_CCP_VERSION = 0x1B
DIAG_SERVICE = 0x20
ACTION_SERVICE = 0x21
PROGRAM_6 = 0x22
DNLOAD_6 = 0x23
COMMAND_RETURN_CODES = {
0x00: "acknowledge / no error",
0x01: "DAQ processor overload",
0x10: "command processor busy",
0x11: "DAQ processor busy",
0x12: "internal timeout",
0x18: "key request",
0x19: "session status request",
0x20: "cold start request",
0x21: "cal. data init. request",
0x22: "DAQ list init. request",
0x23: "code update request",
0x30: "unknown command",
0x31: "command syntax",
0x32: "parameter(s) out of range",
0x33: "access denied",
0x34: "overload",
0x35: "access locked",
0x36: "resource/function not available",
}
class BYTE_ORDER(Enum):
LITTLE_ENDIAN = '<'
BIG_ENDIAN = '>'
class CommandTimeoutError(Exception):
pass
class CommandCounterError(Exception):
pass
class CommandResponseError(Exception):
def __init__(self, message, return_code):
super().__init__()
self.message = message
self.return_code = return_code
def __str__(self):
return self.message
class CcpClient():
def __init__(self, panda, tx_addr: int, rx_addr: int, bus: int=0, byte_order: BYTE_ORDER=BYTE_ORDER.BIG_ENDIAN, debug=False):
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.can_bus = bus
self.byte_order = byte_order
self.debug = debug
self._panda = panda
self._command_counter = -1
def _send_cro(self, cmd: int, dat: bytes = b"") -> None:
self._command_counter = (self._command_counter + 1) & 0xFF
tx_data = (bytes([cmd, self._command_counter]) + dat).ljust(8, b"\x00")
if self.debug:
print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(tx_data)}")
assert len(tx_data) == 8, "data is not 8 bytes"
self._panda.can_clear(self.can_bus)
self._panda.can_clear(0xFFFF)
self._panda.can_send(self.tx_addr, tx_data, self.can_bus)
def _recv_dto(self, timeout: float) -> bytes:
start_time = time.time()
while time.time() - start_time < timeout:
msgs = self._panda.can_recv() or []
if len(msgs) >= 256:
print("CAN RX buffer overflow!!!", file=sys.stderr)
for rx_addr, _, rx_data_bytearray, rx_bus in msgs:
if rx_bus == self.can_bus and rx_addr == self.rx_addr:
rx_data = bytes(rx_data_bytearray)
if self.debug:
print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
assert len(rx_data) == 8, f"message length not 8: {len(rx_data)}"
pid = rx_data[0]
if pid == 0xFF or pid == 0xFE:
err = rx_data[1]
err_desc = COMMAND_RETURN_CODES.get(err, "unknown error")
ctr = rx_data[2]
dat = rx_data[3:]
if pid == 0xFF and self._command_counter != ctr:
raise CommandCounterError(f"counter invalid: {ctr} != {self._command_counter}")
if err >= 0x10 and err <= 0x12:
if self.debug:
print(f"CCP-WAIT: {hex(err)} - {err_desc}")
start_time = time.time()
continue
if err >= 0x30:
raise CommandResponseError(f"{hex(err)} - {err_desc}", err)
else:
dat = rx_data[1:]
return dat
time.sleep(0.001)
raise CommandTimeoutError("timeout waiting for response")
# commands
def connect(self, station_addr: int) -> None:
if station_addr > 65535:
raise ValueError("station address must be less than 65536")
# NOTE: station address is always little endian
self._send_cro(COMMAND_CODE.CONNECT, struct.pack("<H", station_addr))
self._recv_dto(0.025)
def exchange_station_ids(self, device_id_info: bytes = b"") -> dict:
self._send_cro(COMMAND_CODE.EXCHANGE_ID, device_id_info)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"id_length": resp[0],
"data_type": resp[1],
"available": resp[2],
"protected": resp[3],
}
def get_seed(self, resource_mask: int) -> bytes:
if resource_mask > 255:
raise ValueError("resource mask must be less than 256")
self._send_cro(COMMAND_CODE.GET_SEED)
resp = self._recv_dto(0.025)
# protected = resp[0] == 0
seed = resp[1:]
return seed
def unlock(self, key: bytes) -> int:
if len(key) > 6:
raise ValueError("max key size is 6 bytes")
self._send_cro(COMMAND_CODE.UNLOCK, key)
resp = self._recv_dto(0.025)
status = resp[0]
return status
def set_memory_transfer_address(self, mta_num: int, addr_ext: int, addr: int) -> None:
if mta_num > 255:
raise ValueError("MTA number must be less than 256")
if addr_ext > 255:
raise ValueError("address extension must be less than 256")
self._send_cro(COMMAND_CODE.SET_MTA, bytes([mta_num, addr_ext]) + struct.pack(f"{self.byte_order.value}I", addr))
self._recv_dto(0.025)
def download(self, data: bytes) -> int:
if len(data) > 5:
raise ValueError("max data size is 5 bytes")
self._send_cro(COMMAND_CODE.DNLOAD, bytes([len(data)]) + data)
resp = self._recv_dto(0.025)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr # type: ignore
def download_6_bytes(self, data: bytes) -> int:
if len(data) != 6:
raise ValueError("data size must be 6 bytes")
self._send_cro(COMMAND_CODE.DNLOAD_6, data)
resp = self._recv_dto(0.025)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr # type: ignore
def upload(self, size: int) -> bytes:
if size > 5:
raise ValueError("size must be less than 6")
self._send_cro(COMMAND_CODE.UPLOAD, bytes([size]))
return self._recv_dto(0.025)
def short_upload(self, size: int, addr_ext: int, addr: int) -> bytes:
if size > 5:
raise ValueError("size must be less than 6")
if addr_ext > 255:
raise ValueError("address extension must be less than 256")
self._send_cro(COMMAND_CODE.SHORT_UP, bytes([size, addr_ext]) + struct.pack(f"{self.byte_order.value}I", addr))
return self._recv_dto(0.025)
def select_calibration_page(self) -> None:
self._send_cro(COMMAND_CODE.SELECT_CAL_PAGE)
self._recv_dto(0.025)
def get_daq_list_size(self, list_num: int, can_id: int = 0) -> dict:
if list_num > 255:
raise ValueError("list number must be less than 256")
self._send_cro(COMMAND_CODE.GET_DAQ_SIZE, bytes([list_num, 0]) + struct.pack(f"{self.byte_order.value}I", can_id))
resp = self._recv_dto(0.025)
return { # TODO: define a type
"list_size": resp[0],
"first_pid": resp[1],
}
def set_daq_list_pointer(self, list_num: int, odt_num: int, element_num: int) -> None:
if list_num > 255:
raise ValueError("list number must be less than 256")
if odt_num > 255:
raise ValueError("ODT number must be less than 256")
if element_num > 255:
raise ValueError("element number must be less than 256")
self._send_cro(COMMAND_CODE.SET_DAQ_PTR, bytes([list_num, odt_num, element_num]))
self._recv_dto(0.025)
def write_daq_list_entry(self, size: int, addr_ext: int, addr: int) -> None:
if size > 255:
raise ValueError("size must be less than 256")
if addr_ext > 255:
raise ValueError("address extension must be less than 256")
self._send_cro(COMMAND_CODE.WRITE_DAQ, bytes([size, addr_ext]) + struct.pack(f"{self.byte_order.value}I", addr))
self._recv_dto(0.025)
def start_stop_transmission(self, mode: int, list_num: int, odt_num: int, channel_num: int, rate_prescaler: int = 0) -> None:
if mode > 255:
raise ValueError("mode must be less than 256")
if list_num > 255:
raise ValueError("list number must be less than 256")
if odt_num > 255:
raise ValueError("ODT number must be less than 256")
if channel_num > 255:
raise ValueError("channel number must be less than 256")
if rate_prescaler > 65535:
raise ValueError("rate prescaler must be less than 65536")
self._send_cro(COMMAND_CODE.START_STOP, bytes([mode, list_num, odt_num, channel_num]) + struct.pack(f"{self.byte_order.value}H", rate_prescaler))
self._recv_dto(0.025)
def disconnect(self, station_addr: int, temporary: bool = False) -> None:
if station_addr > 65535:
raise ValueError("station address must be less than 65536")
# NOTE: station address is always little endian
self._send_cro(COMMAND_CODE.DISCONNECT, bytes([int(not temporary), 0x00]) + struct.pack("<H", station_addr))
self._recv_dto(0.025)
def set_session_status(self, status: int) -> None:
if status > 255:
raise ValueError("status must be less than 256")
self._send_cro(COMMAND_CODE.SET_S_STATUS, bytes([status]))
self._recv_dto(0.025)
def get_session_status(self) -> dict:
self._send_cro(COMMAND_CODE.GET_S_STATUS)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"status": resp[0],
"info": resp[2] if resp[1] else None,
}
def build_checksum(self, size: int) -> bytes:
self._send_cro(COMMAND_CODE.BUILD_CHKSUM, struct.pack(f"{self.byte_order.value}I", size))
resp = self._recv_dto(30.0)
chksum_size = resp[0]
assert chksum_size <= 4, "checksum more than 4 bytes"
chksum = resp[1:1+chksum_size]
return chksum
def clear_memory(self, size: int) -> None:
self._send_cro(COMMAND_CODE.CLEAR_MEMORY, struct.pack(f"{self.byte_order.value}I", size))
self._recv_dto(30.0)
def program(self, size: int, data: bytes) -> int:
if size > 5:
raise ValueError("size must be less than 6")
if len(data) > 5:
raise ValueError("max data size is 5 bytes")
self._send_cro(COMMAND_CODE.PROGRAM, bytes([size]) + data)
resp = self._recv_dto(0.1)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr # type: ignore
def program_6_bytes(self, data: bytes) -> int:
if len(data) != 6:
raise ValueError("data size must be 6 bytes")
self._send_cro(COMMAND_CODE.PROGRAM_6, data)
resp = self._recv_dto(0.1)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr # type: ignore
def move_memory_block(self, size: int) -> None:
self._send_cro(COMMAND_CODE.MOVE, struct.pack(f"{self.byte_order.value}I", size))
self._recv_dto(0.025)
def diagnostic_service(self, service_num: int, data: bytes = b"") -> dict:
if service_num > 65535:
raise ValueError("service number must be less than 65536")
if len(data) > 4:
raise ValueError("max data size is 4 bytes")
self._send_cro(COMMAND_CODE.DIAG_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"length": resp[0],
"type": resp[1],
}
def action_service(self, service_num: int, data: bytes = b"") -> dict:
if service_num > 65535:
raise ValueError("service number must be less than 65536")
if len(data) > 4:
raise ValueError("max data size is 4 bytes")
self._send_cro(COMMAND_CODE.ACTION_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"length": resp[0],
"type": resp[1],
}
def test_availability(self, station_addr: int) -> None:
if station_addr > 65535:
raise ValueError("station address must be less than 65536")
# NOTE: station address is always little endian
self._send_cro(COMMAND_CODE.TEST, struct.pack("<H", station_addr))
self._recv_dto(0.025)
def start_stop_synchronised_transmission(self, mode: int) -> None:
if mode > 255:
raise ValueError("mode must be less than 256")
self._send_cro(COMMAND_CODE.START_STOP_ALL, bytes([mode]))
self._recv_dto(0.025)
def get_active_calibration_page(self):
self._send_cro(COMMAND_CODE.GET_ACTIVE_CAL_PAGE)
resp = self._recv_dto(0.025)
# cal_addr_ext = resp[0]
cal_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return cal_addr
def get_version(self, desired_version: float = 2.1) -> float:
major, minor = map(int, str(desired_version).split("."))
self._send_cro(COMMAND_CODE.GET_CCP_VERSION, bytes([major, minor]))
resp = self._recv_dto(0.025)
return float(f"{resp[0]}.{resp[1]}")

65
panda/python/constants.py Normal file
View File

@@ -0,0 +1,65 @@
import os
import enum
from typing import List, NamedTuple
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
FW_PATH = os.path.join(BASEDIR, "board/obj/")
USBPACKET_MAX_SIZE = 0x40
class McuConfig(NamedTuple):
mcu: str
mcu_idcode: int
sector_sizes: List[int]
sector_count: int # total sector count, used for MCU identification in DFU mode
uid_address: int
block_size: int
serial_number_address: int
app_address: int
app_fn: str
bootstub_address: int
bootstub_fn: str
def sector_address(self, i):
# assume bootstub is in sector 0
return self.bootstub_address + sum(self.sector_sizes[:i])
F4Config = McuConfig(
"STM32F4",
0x463,
[0x4000 for _ in range(4)] + [0x10000] + [0x20000 for _ in range(11)],
16,
0x1FFF7A10,
0x800,
0x1FFF79C0,
0x8004000,
"panda.bin.signed",
0x8000000,
"bootstub.panda.bin",
)
H7Config = McuConfig(
"STM32H7",
0x483,
[0x20000 for _ in range(7)],
8,
0x1FF1E800,
0x400,
# there is an 8th sector, but we use that for the provisioning chunk, so don't program over that!
0x080FFFC0,
0x8020000,
"panda_h7.bin.signed",
0x8000000,
"bootstub.panda_h7.bin",
)
@enum.unique
class McuType(enum.Enum):
F4 = F4Config
H7 = H7Config
@property
def config(self):
return self.value
MCU_TYPE_BY_IDCODE = {m.config.mcu_idcode: m for m in McuType}

136
panda/python/dfu.py Normal file
View File

@@ -0,0 +1,136 @@
import os
import usb1
import struct
import binascii
from typing import List, Optional
from .base import BaseSTBootloaderHandle
from .spi import STBootloaderSPIHandle, PandaSpiException
from .usb import STBootloaderUSBHandle
from .constants import FW_PATH, McuType
class PandaDFU:
def __init__(self, dfu_serial: Optional[str]):
# try USB, then SPI
handle: Optional[BaseSTBootloaderHandle]
self._context, handle = PandaDFU.usb_connect(dfu_serial)
if handle is None:
self._context, handle = PandaDFU.spi_connect(dfu_serial)
if handle is None:
raise Exception(f"failed to open DFU device {dfu_serial}")
self._handle: BaseSTBootloaderHandle = handle
self._mcu_type: McuType = self._handle.get_mcu_type()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def close(self):
if self._handle is not None:
self._handle.close()
self._handle = None
if self._context is not None:
self._context.close()
@staticmethod
def usb_connect(dfu_serial: Optional[str]):
handle = None
context = usb1.USBContext()
context.open()
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0x0483 and device.getProductID() == 0xdf11:
try:
this_dfu_serial = device.open().getASCIIStringDescriptor(3)
except Exception:
continue
if this_dfu_serial == dfu_serial or dfu_serial is None:
handle = STBootloaderUSBHandle(device, device.open())
break
return context, handle
@staticmethod
def spi_connect(dfu_serial: Optional[str]):
handle = None
this_dfu_serial = None
try:
handle = STBootloaderSPIHandle()
this_dfu_serial = PandaDFU.st_serial_to_dfu_serial(handle.get_uid(), handle.get_mcu_type())
except PandaSpiException:
handle = None
if dfu_serial is not None and dfu_serial != this_dfu_serial:
handle = None
return None, handle
@staticmethod
def list() -> List[str]:
ret = PandaDFU.usb_list()
ret += PandaDFU.spi_list()
return list(set(ret))
@staticmethod
def usb_list() -> List[str]:
dfu_serials = []
try:
with usb1.USBContext() as context:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0x0483 and device.getProductID() == 0xdf11:
try:
dfu_serials.append(device.open().getASCIIStringDescriptor(3))
except Exception:
pass
except Exception:
pass
return dfu_serials
@staticmethod
def spi_list() -> List[str]:
try:
_, h = PandaDFU.spi_connect(None)
if h is not None:
dfu_serial = PandaDFU.st_serial_to_dfu_serial(h.get_uid(), h.get_mcu_type())
return [dfu_serial, ]
except PandaSpiException:
pass
return []
@staticmethod
def st_serial_to_dfu_serial(st: str, mcu_type: McuType = McuType.F4):
if st is None or st == "none":
return None
uid_base = struct.unpack("H" * 6, bytes.fromhex(st))
if mcu_type == McuType.H7:
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4], uid_base[3])).upper().decode("utf-8")
else:
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8")
def get_mcu_type(self) -> McuType:
return self._mcu_type
def reset(self):
self._handle.jump(self._mcu_type.config.bootstub_address)
def program_bootstub(self, code_bootstub):
self._handle.clear_status()
# erase all sectors
for i in range(len(self._mcu_type.config.sector_sizes)):
self._handle.erase_sector(i)
self._handle.program(self._mcu_type.config.bootstub_address, code_bootstub)
def recover(self):
fn = os.path.join(FW_PATH, self._mcu_type.config.bootstub_fn)
with open(fn, "rb") as f:
code = f.read()
self.program_bootstub(code)
self.reset()

140
panda/python/isotp.py Normal file
View File

@@ -0,0 +1,140 @@
import binascii
import time
DEBUG = False
def msg(x):
if DEBUG:
print("S:", binascii.hexlify(x))
assert len(x) <= 7
ret = bytes([len(x)]) + x
return ret.ljust(8, b"\x00")
kmsgs = []
def recv(panda, cnt, addr, nbus):
global kmsgs
ret = []
while len(ret) < cnt:
kmsgs += panda.can_recv()
nmsgs = []
for ids, ts, dat, bus in kmsgs:
if ids == addr and bus == nbus and len(ret) < cnt:
ret.append(dat)
else:
# leave around
nmsgs.append((ids, ts, dat, bus))
kmsgs = nmsgs[-256:]
return ret
def isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr):
msg = recv(panda, 1, addr, bus)[0]
# TODO: handle other subaddr also communicating
assert msg[0] == subaddr
if msg[1] & 0xf0 == 0x10:
# first
tlen = ((msg[1] & 0xf) << 8) | msg[2]
dat = msg[3:]
# 0 block size?
CONTINUE = bytes([subaddr]) + b"\x30" + b"\x00" * 6
panda.can_send(sendaddr, CONTINUE, bus)
idx = 1
for mm in recv(panda, (tlen - len(dat) + 5) // 6, addr, bus):
assert mm[0] == subaddr
assert mm[1] == (0x20 | (idx & 0xF))
dat += mm[2:]
idx += 1
elif msg[1] & 0xf0 == 0x00:
# single
tlen = msg[1] & 0xf
dat = msg[2:]
else:
print(binascii.hexlify(msg))
raise AssertionError
return dat[0:tlen]
# **** import below this line ****
def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None, rate=None):
if recvaddr is None:
recvaddr = addr + 8
if len(x) <= 7 and subaddr is None:
panda.can_send(addr, msg(x), bus)
elif len(x) <= 6 and subaddr is not None:
panda.can_send(addr, bytes([subaddr]) + msg(x)[0:7], bus)
else:
if subaddr:
ss = bytes([subaddr, 0x10 + (len(x) >> 8), len(x) & 0xFF]) + x[0:5]
x = x[5:]
else:
ss = bytes([0x10 + (len(x) >> 8), len(x) & 0xFF]) + x[0:6]
x = x[6:]
idx = 1
sends = []
while len(x) > 0:
if subaddr:
sends.append(((bytes([subaddr, 0x20 + (idx & 0xF)]) + x[0:6]).ljust(8, b"\x00")))
x = x[6:]
else:
sends.append(((bytes([0x20 + (idx & 0xF)]) + x[0:7]).ljust(8, b"\x00")))
x = x[7:]
idx += 1
# actually send
panda.can_send(addr, ss, bus)
rr = recv(panda, 1, recvaddr, bus)[0]
if rr.find(b"\x30\x01") != -1:
for s in sends[:-1]:
panda.can_send(addr, s, 0)
rr = recv(panda, 1, recvaddr, bus)[0]
panda.can_send(addr, sends[-1], 0)
else:
if rate is None:
panda.can_send_many([(addr, None, s, bus) for s in sends])
else:
for dat in sends:
panda.can_send(addr, dat, bus)
time.sleep(rate)
def isotp_recv(panda, addr, bus=0, sendaddr=None, subaddr=None):
if sendaddr is None:
sendaddr = addr - 8
if subaddr is not None:
dat = isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr)
else:
msg = recv(panda, 1, addr, bus)[0]
if msg[0] & 0xf0 == 0x10:
# first
tlen = ((msg[0] & 0xf) << 8) | msg[1]
dat = msg[2:]
# 0 block size?
CONTINUE = b"\x30" + b"\x00" * 7
panda.can_send(sendaddr, CONTINUE, bus)
idx = 1
for mm in recv(panda, (tlen - len(dat) + 6) // 7, addr, bus):
assert mm[0] == (0x20 | (idx & 0xF))
dat += mm[1:]
idx += 1
elif msg[0] & 0xf0 == 0x00:
# single
tlen = msg[0] & 0xf
dat = msg[1:]
else:
raise AssertionError
dat = dat[0:tlen]
if DEBUG:
print("R:", binascii.hexlify(dat))
return dat

35
panda/python/serial.py Normal file
View File

@@ -0,0 +1,35 @@
# mimic a python serial port
class PandaSerial(object):
def __init__(self, panda, port, baud):
self.panda = panda
self.port = port
self.panda.set_uart_parity(self.port, 0)
self._baudrate = baud
self.panda.set_uart_baud(self.port, baud)
self.buf = b""
def read(self, l=1):
tt = self.panda.serial_read(self.port)
if len(tt) > 0:
self.buf += tt
ret = self.buf[0:l]
self.buf = self.buf[l:]
return ret
def write(self, dat):
return self.panda.serial_write(self.port, dat)
def close(self):
pass
def flush(self):
pass
@property
def baudrate(self):
return self._baudrate
@baudrate.setter
def baudrate(self, value):
self.panda.set_uart_baud(self.port, value)
self._baudrate = value

438
panda/python/spi.py Normal file
View File

@@ -0,0 +1,438 @@
import binascii
import ctypes
import os
import fcntl
import math
import time
import struct
import logging
import threading
from contextlib import contextmanager
from functools import reduce
from typing import Callable, List, Optional
from .base import BaseHandle, BaseSTBootloaderHandle, TIMEOUT
from .constants import McuType, MCU_TYPE_BY_IDCODE, USBPACKET_MAX_SIZE
try:
import spidev
except ImportError:
spidev = None
# Constants
SYNC = 0x5A
HACK = 0x79
DACK = 0x85
NACK = 0x1F
CHECKSUM_START = 0xAB
MIN_ACK_TIMEOUT_MS = 100
MAX_XFER_RETRY_COUNT = 5
XFER_SIZE = 0x40*31
DEV_PATH = "/dev/spidev0.0"
def crc8(data):
crc = 0xFF # standard init value
poly = 0xD5 # standard crc8: x8+x7+x6+x4+x2+1
size = len(data)
for i in range(size - 1, -1, -1):
crc ^= data[i]
for _ in range(8):
if ((crc & 0x80) != 0):
crc = ((crc << 1) ^ poly) & 0xFF
else:
crc <<= 1
return crc
class PandaSpiException(Exception):
pass
class PandaProtocolMismatch(PandaSpiException):
pass
class PandaSpiUnavailable(PandaSpiException):
pass
class PandaSpiNackResponse(PandaSpiException):
pass
class PandaSpiMissingAck(PandaSpiException):
pass
class PandaSpiBadChecksum(PandaSpiException):
pass
class PandaSpiTransferFailed(PandaSpiException):
pass
SPI_LOCK = threading.Lock()
class PandaSpiTransfer(ctypes.Structure):
_fields_ = [
('rx_buf', ctypes.c_uint64),
('tx_buf', ctypes.c_uint64),
('tx_length', ctypes.c_uint32),
('rx_length_max', ctypes.c_uint32),
('timeout', ctypes.c_uint32),
('endpoint', ctypes.c_uint8),
('expect_disconnect', ctypes.c_uint8),
]
class SpiDevice:
"""
Provides locked, thread-safe access to a panda's SPI interface.
"""
# 50MHz is the max of the 845. older rev comma three
# may not support the full 50MHz
MAX_SPEED = 50000000
def __init__(self, speed=MAX_SPEED):
assert speed <= self.MAX_SPEED
if not os.path.exists(DEV_PATH):
raise PandaSpiUnavailable(f"SPI device not found: {DEV_PATH}")
if spidev is None:
raise PandaSpiUnavailable("spidev is not installed")
self._spidev = spidev.SpiDev() # pylint: disable=c-extension-no-member
self._spidev.open(0, 0)
self._spidev.max_speed_hz = speed
@contextmanager
def acquire(self):
try:
SPI_LOCK.acquire()
fcntl.flock(self._spidev, fcntl.LOCK_EX)
yield self._spidev
finally:
fcntl.flock(self._spidev, fcntl.LOCK_UN)
SPI_LOCK.release()
def close(self):
self._spidev.close()
class PandaSpiHandle(BaseHandle):
"""
A class that mimics a libusb1 handle for panda SPI communications.
"""
PROTOCOL_VERSION = 2
def __init__(self) -> None:
self.dev = SpiDevice()
self._transfer_raw: Callable[[SpiDevice, int, bytes, int, int, bool], bytes] = self._transfer_spidev
if "KERN" in os.environ:
self._transfer_raw = self._transfer_kernel_driver
self.tx_buf = bytearray(1024)
self.rx_buf = bytearray(1024)
tx_buf_raw = ctypes.c_char.from_buffer(self.tx_buf)
rx_buf_raw = ctypes.c_char.from_buffer(self.rx_buf)
self.ioctl_data = PandaSpiTransfer()
self.ioctl_data.tx_buf = ctypes.addressof(tx_buf_raw)
self.ioctl_data.rx_buf = ctypes.addressof(rx_buf_raw)
self.fileno = self.dev._spidev.fileno()
# helpers
def _calc_checksum(self, data: bytes) -> int:
cksum = CHECKSUM_START
for b in data:
cksum ^= b
return cksum
def _wait_for_ack(self, spi, ack_val: int, timeout: int, tx: int, length: int = 1) -> bytes:
timeout_s = max(MIN_ACK_TIMEOUT_MS, timeout) * 1e-3
start = time.monotonic()
while (timeout == 0) or ((time.monotonic() - start) < timeout_s):
dat = spi.xfer2([tx, ] * length)
if dat[0] == NACK:
raise PandaSpiNackResponse
elif dat[0] == ack_val:
return bytes(dat)
raise PandaSpiMissingAck
def _transfer_spidev(self, spi, endpoint: int, data, timeout: int, max_rx_len: int = 1000, expect_disconnect: bool = False) -> bytes:
max_rx_len = max(USBPACKET_MAX_SIZE, max_rx_len)
logging.debug("- send header")
packet = struct.pack("<BBHH", SYNC, endpoint, len(data), max_rx_len)
packet += bytes([self._calc_checksum(packet), ])
spi.xfer2(packet)
logging.debug("- waiting for header ACK")
self._wait_for_ack(spi, HACK, MIN_ACK_TIMEOUT_MS, 0x11)
logging.debug("- sending data")
packet = bytes([*data, self._calc_checksum(data)])
spi.xfer2(packet)
if expect_disconnect:
logging.debug("- expecting disconnect, returning")
return b""
else:
logging.debug("- waiting for data ACK")
preread_len = USBPACKET_MAX_SIZE + 1 # read enough for a controlRead
dat = self._wait_for_ack(spi, DACK, timeout, 0x13, length=3 + preread_len)
# get response length, then response
response_len = struct.unpack("<H", dat[1:3])[0]
if response_len > max_rx_len:
raise PandaSpiException(f"response length greater than max ({max_rx_len} {response_len})")
# read rest
remaining = (response_len + 1) - preread_len
if remaining > 0:
dat += bytes(spi.readbytes(remaining))
dat = dat[:3 + response_len + 1]
if self._calc_checksum(dat) != 0:
raise PandaSpiBadChecksum
return dat[3:-1]
def _transfer_kernel_driver(self, spi, endpoint: int, data, timeout: int, max_rx_len: int = 1000, expect_disconnect: bool = False) -> bytes:
import spidev2
self.tx_buf[:len(data)] = data
self.ioctl_data.endpoint = endpoint
self.ioctl_data.tx_length = len(data)
self.ioctl_data.rx_length_max = max_rx_len
self.ioctl_data.expect_disconnect = int(expect_disconnect)
# TODO: use our own ioctl request
try:
ret = fcntl.ioctl(self.fileno, spidev2.SPI_IOC_RD_LSB_FIRST, self.ioctl_data)
except OSError as e:
raise PandaSpiException from e
if ret < 0:
raise PandaSpiException(f"ioctl returned {ret}")
return bytes(self.rx_buf[:ret])
def _transfer(self, endpoint: int, data, timeout: int, max_rx_len: int = 1000, expect_disconnect: bool = False) -> bytes:
logging.debug("starting transfer: endpoint=%d, max_rx_len=%d", endpoint, max_rx_len)
logging.debug("==============================================")
n = 0
start_time = time.monotonic()
exc = PandaSpiException()
while (timeout == 0) or (time.monotonic() - start_time) < timeout*1e-3:
n += 1
logging.debug("\ntry #%d", n)
with self.dev.acquire() as spi:
try:
return self._transfer_raw(spi, endpoint, data, timeout, max_rx_len, expect_disconnect)
except PandaSpiException as e:
exc = e
logging.debug("SPI transfer failed, retrying", exc_info=True)
raise exc
def get_protocol_version(self) -> bytes:
vers_str = b"VERSION"
def _get_version(spi) -> bytes:
spi.writebytes(vers_str)
logging.debug("- waiting for echo")
start = time.monotonic()
while True:
version_bytes = spi.readbytes(len(vers_str) + 2)
if bytes(version_bytes).startswith(vers_str):
break
if (time.monotonic() - start) > 0.01:
raise PandaSpiMissingAck
rlen = struct.unpack("<H", bytes(version_bytes[-2:]))[0]
if rlen > 1000:
raise PandaSpiException("response length greater than max")
# get response
dat = spi.readbytes(rlen + 1)
resp = dat[:-1]
calculated_crc = crc8(bytes(version_bytes + resp))
if calculated_crc != dat[-1]:
raise PandaSpiBadChecksum
return bytes(resp)
exc = PandaSpiException()
with self.dev.acquire() as spi:
for _ in range(10):
try:
return _get_version(spi)
except PandaSpiException as e:
exc = e
logging.debug("SPI get protocol version failed, retrying", exc_info=True)
raise exc
# libusb1 functions
def close(self):
self.dev.close()
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = TIMEOUT, expect_disconnect: bool = False):
return self._transfer(0, struct.pack("<BHHH", request, value, index, 0), timeout, expect_disconnect=expect_disconnect)
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = TIMEOUT):
return self._transfer(0, struct.pack("<BHHH", request, value, index, length), timeout, max_rx_len=length)
def bulkWrite(self, endpoint: int, data: bytes, timeout: int = TIMEOUT) -> int:
for x in range(math.ceil(len(data) / XFER_SIZE)):
self._transfer(endpoint, data[XFER_SIZE*x:XFER_SIZE*(x+1)], timeout)
return len(data)
def bulkRead(self, endpoint: int, length: int, timeout: int = TIMEOUT) -> bytes:
ret = b""
for _ in range(math.ceil(length / XFER_SIZE)):
d = self._transfer(endpoint, [], timeout, max_rx_len=XFER_SIZE)
ret += d
if len(d) < XFER_SIZE:
break
return ret
class STBootloaderSPIHandle(BaseSTBootloaderHandle):
"""
Implementation of the STM32 SPI bootloader protocol described in:
https://www.st.com/resource/en/application_note/an4286-spi-protocol-used-in-the-stm32-bootloader-stmicroelectronics.pdf
"""
SYNC = 0x5A
ACK = 0x79
NACK = 0x1F
def __init__(self):
self.dev = SpiDevice(speed=1000000)
# say hello
try:
with self.dev.acquire() as spi:
spi.xfer([self.SYNC, ])
try:
self._get_ack(spi)
except (PandaSpiNackResponse, PandaSpiMissingAck):
# NACK ok here, will only ACK the first time
pass
self._mcu_type = MCU_TYPE_BY_IDCODE[self.get_chip_id()]
except PandaSpiException:
raise PandaSpiException("failed to connect to panda") from None
def _get_ack(self, spi, timeout=1.0):
data = 0x00
start_time = time.monotonic()
while data not in (self.ACK, self.NACK) and (time.monotonic() - start_time < timeout):
data = spi.xfer([0x00, ])[0]
time.sleep(0.001)
spi.xfer([self.ACK, ])
if data == self.NACK:
raise PandaSpiNackResponse
elif data != self.ACK:
raise PandaSpiMissingAck
def _cmd_no_retry(self, cmd: int, data: Optional[List[bytes]] = None, read_bytes: int = 0, predata=None) -> bytes:
ret = b""
with self.dev.acquire() as spi:
# sync + command
spi.xfer([self.SYNC, ])
spi.xfer([cmd, cmd ^ 0xFF])
self._get_ack(spi, timeout=0.1)
# "predata" - for commands that send the first data without a checksum
if predata is not None:
spi.xfer(predata)
self._get_ack(spi)
# send data
if data is not None:
for d in data:
if predata is not None:
spi.xfer(d + self._checksum(predata + d))
else:
spi.xfer(d + self._checksum(d))
self._get_ack(spi, timeout=20)
# receive
if read_bytes > 0:
ret = spi.xfer([0x00, ]*(read_bytes + 1))[1:]
if data is None or len(data) == 0:
self._get_ack(spi)
return bytes(ret)
def _cmd(self, cmd: int, data: Optional[List[bytes]] = None, read_bytes: int = 0, predata=None) -> bytes:
exc = PandaSpiException()
for n in range(MAX_XFER_RETRY_COUNT):
try:
return self._cmd_no_retry(cmd, data, read_bytes, predata)
except PandaSpiException as e:
exc = e
logging.debug("SPI transfer failed, %d retries left", MAX_XFER_RETRY_COUNT - n - 1, exc_info=True)
raise exc
def _checksum(self, data: bytes) -> bytes:
if len(data) == 1:
ret = data[0] ^ 0xFF
else:
ret = reduce(lambda a, b: a ^ b, data)
return bytes([ret, ])
# *** Bootloader commands ***
def read(self, address: int, length: int):
data = [struct.pack('>I', address), struct.pack('B', length - 1)]
return self._cmd(0x11, data=data, read_bytes=length)
def get_chip_id(self) -> int:
r = self._cmd(0x02, read_bytes=3)
assert r[0] == 1 # response length - 1
return ((r[1] << 8) + r[2])
def go_cmd(self, address: int) -> None:
self._cmd(0x21, data=[struct.pack('>I', address), ])
# *** helpers ***
def get_uid(self):
dat = self.read(McuType.H7.config.uid_address, 12)
return binascii.hexlify(dat).decode()
def erase_sector(self, sector: int):
p = struct.pack('>H', 0) # number of sectors to erase
d = struct.pack('>H', sector)
self._cmd(0x44, data=[d, ], predata=p)
# *** PandaDFU API ***
def get_mcu_type(self):
return self._mcu_type
def clear_status(self):
pass
def close(self):
self.dev.close()
def program(self, address, dat):
bs = 256 # max block size for writing to flash over SPI
dat += b"\xFF" * ((bs - len(dat)) % bs)
for i in range(len(dat) // bs):
block = dat[i * bs:(i + 1) * bs]
self._cmd(0x31, data=[
struct.pack('>I', address + i*bs),
bytes([len(block) - 1]) + block,
])
def jump(self, address):
self.go_cmd(self._mcu_type.config.bootstub_address)

937
panda/python/uds.py Normal file
View File

@@ -0,0 +1,937 @@
import time
import struct
from collections import deque
from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast
from enum import IntEnum
from functools import partial
class SERVICE_TYPE(IntEnum):
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
SECURITY_ACCESS = 0x27
COMMUNICATION_CONTROL = 0x28
TESTER_PRESENT = 0x3E
ACCESS_TIMING_PARAMETER = 0x83
SECURED_DATA_TRANSMISSION = 0x84
CONTROL_DTC_SETTING = 0x85
RESPONSE_ON_EVENT = 0x86
LINK_CONTROL = 0x87
READ_DATA_BY_IDENTIFIER = 0x22
READ_MEMORY_BY_ADDRESS = 0x23
READ_SCALING_DATA_BY_IDENTIFIER = 0x24
READ_DATA_BY_PERIODIC_IDENTIFIER = 0x2A
DYNAMICALLY_DEFINE_DATA_IDENTIFIER = 0x2C
WRITE_DATA_BY_IDENTIFIER = 0x2E
WRITE_MEMORY_BY_ADDRESS = 0x3D
CLEAR_DIAGNOSTIC_INFORMATION = 0x14
READ_DTC_INFORMATION = 0x19
INPUT_OUTPUT_CONTROL_BY_IDENTIFIER = 0x2F
ROUTINE_CONTROL = 0x31
REQUEST_DOWNLOAD = 0x34
REQUEST_UPLOAD = 0x35
TRANSFER_DATA = 0x36
REQUEST_TRANSFER_EXIT = 0x37
class SESSION_TYPE(IntEnum):
DEFAULT = 1
PROGRAMMING = 2
EXTENDED_DIAGNOSTIC = 3
SAFETY_SYSTEM_DIAGNOSTIC = 4
class RESET_TYPE(IntEnum):
HARD = 1
KEY_OFF_ON = 2
SOFT = 3
ENABLE_RAPID_POWER_SHUTDOWN = 4
DISABLE_RAPID_POWER_SHUTDOWN = 5
class ACCESS_TYPE(IntEnum):
REQUEST_SEED = 1
SEND_KEY = 2
class CONTROL_TYPE(IntEnum):
ENABLE_RX_ENABLE_TX = 0
ENABLE_RX_DISABLE_TX = 1
DISABLE_RX_ENABLE_TX = 2
DISABLE_RX_DISABLE_TX = 3
class MESSAGE_TYPE(IntEnum):
NORMAL = 1
NETWORK_MANAGEMENT = 2
NORMAL_AND_NETWORK_MANAGEMENT = 3
class TIMING_PARAMETER_TYPE(IntEnum):
READ_EXTENDED_SET = 1
SET_TO_DEFAULT_VALUES = 2
READ_CURRENTLY_ACTIVE = 3
SET_TO_GIVEN_VALUES = 4
class DTC_SETTING_TYPE(IntEnum):
ON = 1
OFF = 2
class RESPONSE_EVENT_TYPE(IntEnum):
STOP_RESPONSE_ON_EVENT = 0
ON_DTC_STATUS_CHANGE = 1
ON_TIMER_INTERRUPT = 2
ON_CHANGE_OF_DATA_IDENTIFIER = 3
REPORT_ACTIVATED_EVENTS = 4
START_RESPONSE_ON_EVENT = 5
CLEAR_RESPONSE_ON_EVENT = 6
ON_COMPARISON_OF_VALUES = 7
class LINK_CONTROL_TYPE(IntEnum):
VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1
VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2
TRANSITION_BAUDRATE = 3
class BAUD_RATE_TYPE(IntEnum):
PC9600 = 1
PC19200 = 2
PC38400 = 3
PC57600 = 4
PC115200 = 5
CAN125000 = 16
CAN250000 = 17
CAN500000 = 18
CAN1000000 = 19
class DATA_IDENTIFIER_TYPE(IntEnum):
BOOT_SOFTWARE_IDENTIFICATION = 0xF180
APPLICATION_SOFTWARE_IDENTIFICATION = 0xF181
APPLICATION_DATA_IDENTIFICATION = 0xF182
BOOT_SOFTWARE_FINGERPRINT = 0xF183
APPLICATION_SOFTWARE_FINGERPRINT = 0xF184
APPLICATION_DATA_FINGERPRINT = 0xF185
ACTIVE_DIAGNOSTIC_SESSION = 0xF186
VEHICLE_MANUFACTURER_SPARE_PART_NUMBER = 0xF187
VEHICLE_MANUFACTURER_ECU_SOFTWARE_NUMBER = 0xF188
VEHICLE_MANUFACTURER_ECU_SOFTWARE_VERSION_NUMBER = 0xF189
SYSTEM_SUPPLIER_IDENTIFIER = 0xF18A
ECU_MANUFACTURING_DATE = 0xF18B
ECU_SERIAL_NUMBER = 0xF18C
SUPPORTED_FUNCTIONAL_UNITS = 0xF18D
VEHICLE_MANUFACTURER_KIT_ASSEMBLY_PART_NUMBER = 0xF18E
VIN = 0xF190
VEHICLE_MANUFACTURER_ECU_HARDWARE_NUMBER = 0xF191
SYSTEM_SUPPLIER_ECU_HARDWARE_NUMBER = 0xF192
SYSTEM_SUPPLIER_ECU_HARDWARE_VERSION_NUMBER = 0xF193
SYSTEM_SUPPLIER_ECU_SOFTWARE_NUMBER = 0xF194
SYSTEM_SUPPLIER_ECU_SOFTWARE_VERSION_NUMBER = 0xF195
EXHAUST_REGULATION_OR_TYPE_APPROVAL_NUMBER = 0xF196
SYSTEM_NAME_OR_ENGINE_TYPE = 0xF197
REPAIR_SHOP_CODE_OR_TESTER_SERIAL_NUMBER = 0xF198
PROGRAMMING_DATE = 0xF199
CALIBRATION_REPAIR_SHOP_CODE_OR_CALIBRATION_EQUIPMENT_SERIAL_NUMBER = 0xF19A
CALIBRATION_DATE = 0xF19B
CALIBRATION_EQUIPMENT_SOFTWARE_NUMBER = 0xF19C
ECU_INSTALLATION_DATE = 0xF19D
ODX_FILE = 0xF19E
ENTITY = 0xF19F
class TRANSMISSION_MODE_TYPE(IntEnum):
SEND_AT_SLOW_RATE = 1
SEND_AT_MEDIUM_RATE = 2
SEND_AT_FAST_RATE = 3
STOP_SENDING = 4
class DYNAMIC_DEFINITION_TYPE(IntEnum):
DEFINE_BY_IDENTIFIER = 1
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3
class ISOTP_FRAME_TYPE(IntEnum):
SINGLE = 0
FIRST = 1
CONSECUTIVE = 2
FLOW = 3
class DynamicSourceDefinition(NamedTuple):
data_identifier: int
position: int
memory_size: int
memory_address: int
class DTC_GROUP_TYPE(IntEnum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF
class DTC_REPORT_TYPE(IntEnum):
NUMBER_OF_DTC_BY_STATUS_MASK = 0x01
DTC_BY_STATUS_MASK = 0x02
DTC_SNAPSHOT_IDENTIFICATION = 0x03
DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER = 0x04
DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER = 0x05
DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x06
NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD = 0x07
DTC_BY_SEVERITY_MASK_RECORD = 0x08
SEVERITY_INFORMATION_OF_DTC = 0x09
SUPPORTED_DTC = 0x0A
FIRST_TEST_FAILED_DTC = 0x0B
FIRST_CONFIRMED_DTC = 0x0C
MOST_RECENT_TEST_FAILED_DTC = 0x0D
MOST_RECENT_CONFIRMED_DTC = 0x0E
MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x0F
MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x10
NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x11
NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x12
EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x13
DTC_FAULT_DETECTION_COUNTER = 0x14
DTC_WITH_PERMANENT_STATUS = 0x15
class DTC_STATUS_MASK_TYPE(IntEnum):
TEST_FAILED = 0x01
TEST_FAILED_THIS_OPERATION_CYCLE = 0x02
PENDING_DTC = 0x04
CONFIRMED_DTC = 0x08
TEST_NOT_COMPLETED_SINCE_LAST_CLEAR = 0x10
TEST_FAILED_SINCE_LAST_CLEAR = 0x20
TEST_NOT_COMPLETED_THIS_OPERATION_CYCLE = 0x40
WARNING_INDICATOR_REQUESTED = 0x80
ALL = 0xFF
class DTC_SEVERITY_MASK_TYPE(IntEnum):
MAINTENANCE_ONLY = 0x20
CHECK_AT_NEXT_HALT = 0x40
CHECK_IMMEDIATELY = 0x80
ALL = 0xE0
class CONTROL_PARAMETER_TYPE(IntEnum):
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
SHORT_TERM_ADJUSTMENT = 3
class ROUTINE_CONTROL_TYPE(IntEnum):
START = 1
STOP = 2
REQUEST_RESULTS = 3
class ROUTINE_IDENTIFIER_TYPE(IntEnum):
ERASE_MEMORY = 0xFF00
CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01
ERASE_MIRROR_MEMORY_DTCS = 0xFF02
class MessageTimeoutError(Exception):
pass
class NegativeResponseError(Exception):
def __init__(self, message, service_id, error_code):
super().__init__()
self.message = message
self.service_id = service_id
self.error_code = error_code
def __str__(self):
return self.message
class InvalidServiceIdError(Exception):
pass
class InvalidSubFunctionError(Exception):
pass
class InvalidSubAddressError(Exception):
pass
_negative_response_codes = {
0x00: 'positive response',
0x10: 'general reject',
0x11: 'service not supported',
0x12: 'sub-function not supported',
0x13: 'incorrect message length or invalid format',
0x14: 'response too long',
0x21: 'busy repeat request',
0x22: 'conditions not correct',
0x24: 'request sequence error',
0x25: 'no response from subnet component',
0x26: 'failure prevents execution of requested action',
0x31: 'request out of range',
0x33: 'security access denied',
0x35: 'invalid key',
0x36: 'exceed number of attempts',
0x37: 'required time delay not expired',
0x70: 'upload download not accepted',
0x71: 'transfer data suspended',
0x72: 'general programming failure',
0x73: 'wrong block sequence counter',
0x78: 'request correctly received - response pending',
0x7e: 'sub-function not supported in active session',
0x7f: 'service not supported in active session',
0x81: 'rpm too high',
0x82: 'rpm too low',
0x83: 'engine is running',
0x84: 'engine is not running',
0x85: 'engine run time too low',
0x86: 'temperature too high',
0x87: 'temperature too low',
0x88: 'vehicle speed too high',
0x89: 'vehicle speed too low',
0x8a: 'throttle/pedal too high',
0x8b: 'throttle/pedal too low',
0x8c: 'transmission not in neutral',
0x8d: 'transmission not in gear',
0x8f: 'brake switch(es) not closed',
0x90: 'shifter lever not in park',
0x91: 'torque converter clutch locked',
0x92: 'voltage too high',
0x93: 'voltage too low',
}
def get_dtc_num_as_str(dtc_num_bytes):
# ISO 15031-6
designator = {
0b00: "P",
0b01: "C",
0b10: "B",
0b11: "U",
}
d = designator[dtc_num_bytes[0] >> 6]
n = bytes([dtc_num_bytes[0] & 0x3F]) + dtc_num_bytes[1:]
return d + n.hex()
def get_dtc_status_names(status):
result = list()
for m in DTC_STATUS_MASK_TYPE:
if m == DTC_STATUS_MASK_TYPE.ALL:
continue
if status & m.value:
result.append(m.name)
return result
class CanClient():
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]],
tx_addr: int, rx_addr: int, bus: int, sub_addr: Optional[int] = None, debug: bool = False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.rx_buff: Deque[bytes] = deque()
self.sub_addr = sub_addr
self.bus = bus
self.debug = debug
def _recv_filter(self, bus: int, addr: int) -> bool:
# handle functional addresses (switch to first addr to respond)
if self.tx_addr == 0x7DF:
is_response = addr >= 0x7E8 and addr <= 0x7EF
if is_response:
if self.debug:
print(f"switch to physical addr {hex(addr)}")
self.tx_addr = addr - 8
self.rx_addr = addr
return is_response
if self.tx_addr == 0x18DB33F1:
is_response = addr >= 0x18DAF100 and addr <= 0x18DAF1FF
if is_response:
if self.debug:
print(f"switch to physical addr {hex(addr)}")
self.tx_addr = 0x18DA00F1 + (addr << 8 & 0xFF00)
self.rx_addr = addr
return bus == self.bus and addr == self.rx_addr
def _recv_buffer(self, drain: bool = False) -> None:
while True:
msgs = self.rx()
if drain:
if self.debug:
print("CAN-RX: drain - {}".format(len(msgs)))
self.rx_buff.clear()
else:
for rx_addr, _, rx_data, rx_bus in msgs or []:
if self._recv_filter(rx_bus, rx_addr) and len(rx_data) > 0:
rx_data = bytes(rx_data) # convert bytearray to bytes
if self.debug:
print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
# Cut off sub addr in first byte
if self.sub_addr is not None:
if rx_data[0] != self.sub_addr:
raise InvalidSubAddressError(f"isotp - rx: invalid sub-address: {rx_data[0]}, expected: {self.sub_addr}")
rx_data = rx_data[1:]
self.rx_buff.append(rx_data)
# break when non-full buffer is processed
if len(msgs) < 254:
return
def recv(self, drain: bool = False) -> Generator[bytes, None, None]:
# buffer rx messages in case two response messages are received at once
# (e.g. response pending and success/failure response)
self._recv_buffer(drain)
try:
while True:
yield self.rx_buff.popleft()
except IndexError:
pass # empty
def send(self, msgs: List[bytes], delay: float = 0) -> None:
for i, msg in enumerate(msgs):
if delay and i != 0:
if self.debug:
print(f"CAN-TX: delay - {delay}")
time.sleep(delay)
if self.sub_addr is not None:
msg = bytes([self.sub_addr]) + msg
if self.debug:
print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}")
assert len(msg) <= 8
self.tx(self.tx_addr, msg, self.bus)
# prevent rx buffer from overflowing on large tx
if i % 10 == 9:
self._recv_buffer()
class IsoTpMessage():
def __init__(self, can_client: CanClient, timeout: float = 1, single_frame_mode: bool = False, separation_time: float = 0,
debug: bool = False, max_len: int = 8):
self._can_client = can_client
self.timeout = timeout
self.single_frame_mode = single_frame_mode
self.debug = debug
self.max_len = max_len
# <= 127, separation time in milliseconds
# 0xF1 to 0xF9 UF, 100 to 900 microseconds
if 1e-4 <= separation_time <= 9e-4:
offset = int(round(separation_time, 4) * 1e4) - 1
separation_time = 0xF1 + offset
elif 0 <= separation_time <= 0.127:
separation_time = round(separation_time * 1000)
else:
raise Exception("Separation time not in range")
self.flow_control_msg = bytes([
0x30, # flow control
0x01 if self.single_frame_mode else 0x00, # block size
separation_time,
]).ljust(self.max_len, b"\x00")
def send(self, dat: bytes, setup_only: bool = False) -> None:
# throw away any stale data
self._can_client.recv(drain=True)
self.tx_dat = dat
self.tx_len = len(dat)
self.tx_idx = 0
self.tx_done = False
self.rx_dat = b""
self.rx_len = 0
self.rx_idx = 0
self.rx_done = False
if self.debug and not setup_only:
print(f"ISO-TP: REQUEST - {hex(self._can_client.tx_addr)} 0x{bytes.hex(self.tx_dat)}")
self._tx_first_frame(setup_only=setup_only)
def _tx_first_frame(self, setup_only: bool = False) -> None:
if self.tx_len < self.max_len:
# single frame (send all bytes)
if self.debug and not setup_only:
print(f"ISO-TP: TX - single frame - {hex(self._can_client.tx_addr)}")
msg = (bytes([self.tx_len]) + self.tx_dat).ljust(self.max_len, b"\x00")
self.tx_done = True
else:
# first frame (send first 6 bytes)
if self.debug and not setup_only:
print(f"ISO-TP: TX - first frame - {hex(self._can_client.tx_addr)}")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:self.max_len - 2]).ljust(self.max_len - 2, b"\x00")
if not setup_only:
self._can_client.send([msg])
def recv(self, timeout=None) -> Tuple[Optional[bytes], bool]:
if timeout is None:
timeout = self.timeout
start_time = time.monotonic()
rx_in_progress = False
try:
while True:
for msg in self._can_client.recv():
frame_type = self._isotp_rx_next(msg)
start_time = time.monotonic()
rx_in_progress = frame_type == ISOTP_FRAME_TYPE.CONSECUTIVE
if self.tx_done and self.rx_done:
return self.rx_dat, False
# no timeout indicates non-blocking
if timeout == 0:
return None, rx_in_progress
if time.monotonic() - start_time > timeout:
raise MessageTimeoutError("timeout waiting for response")
finally:
if self.debug and self.rx_dat:
print(f"ISO-TP: RESPONSE - {hex(self._can_client.rx_addr)} 0x{bytes.hex(self.rx_dat)}")
def _isotp_rx_next(self, rx_data: bytes) -> ISOTP_FRAME_TYPE:
# TODO: Handle CAN frame data optimization, which is allowed with some frame types
# # ISO 15765-2 specifies an eight byte CAN frame for ISO-TP communication
# assert len(rx_data) == self.max_len, f"isotp - rx: invalid CAN frame length: {len(rx_data)}"
if rx_data[0] >> 4 == ISOTP_FRAME_TYPE.SINGLE:
self.rx_len = rx_data[0] & 0x0F
assert self.rx_len < self.max_len, f"isotp - rx: invalid single frame length: {self.rx_len}"
self.rx_dat = rx_data[1:1 + self.rx_len]
self.rx_idx = 0
self.rx_done = True
if self.debug:
print(f"ISO-TP: RX - single frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}")
return ISOTP_FRAME_TYPE.SINGLE
elif rx_data[0] >> 4 == ISOTP_FRAME_TYPE.FIRST:
self.rx_len = ((rx_data[0] & 0x0F) << 8) + rx_data[1]
assert self.max_len <= self.rx_len, f"isotp - rx: invalid first frame length: {self.rx_len}"
self.rx_dat = rx_data[2:]
self.rx_idx = 0
self.rx_done = False
if self.debug:
print(f"ISO-TP: RX - first frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}")
if self.debug:
print(f"ISO-TP: TX - flow control continue - {hex(self._can_client.tx_addr)}")
# send flow control message
self._can_client.send([self.flow_control_msg])
return ISOTP_FRAME_TYPE.FIRST
elif rx_data[0] >> 4 == ISOTP_FRAME_TYPE.CONSECUTIVE:
assert not self.rx_done, "isotp - rx: consecutive frame with no active frame"
self.rx_idx += 1
assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index"
rx_size = self.rx_len - len(self.rx_dat)
self.rx_dat += rx_data[1:1 + rx_size]
if self.rx_len == len(self.rx_dat):
self.rx_done = True
elif self.single_frame_mode:
# notify ECU to send next frame
self._can_client.send([self.flow_control_msg])
if self.debug:
print(f"ISO-TP: RX - consecutive frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}")
return ISOTP_FRAME_TYPE.CONSECUTIVE
elif rx_data[0] >> 4 == ISOTP_FRAME_TYPE.FLOW:
assert not self.tx_done, "isotp - rx: flow control with no active frame"
assert rx_data[0] != 0x32, "isotp - rx: flow-control overflow/abort"
assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid"
if rx_data[0] == 0x30:
if self.debug:
print(f"ISO-TP: RX - flow control continue - {hex(self._can_client.tx_addr)}")
delay_ts = rx_data[2] & 0x7F
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
delay_sec = delay_ts / delay_div
# first frame = 6 bytes, each consecutive frame = 7 bytes
num_bytes = self.max_len - 1
start = self.max_len - 2 + self.tx_idx * num_bytes
count = rx_data[1]
end = start + count * num_bytes if count > 0 else self.tx_len
tx_msgs = []
for i in range(start, end, num_bytes):
self.tx_idx += 1
# consecutive tx messages
msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i + num_bytes]).ljust(self.max_len, b"\x00")
tx_msgs.append(msg)
# send consecutive tx messages
self._can_client.send(tx_msgs, delay=delay_sec)
if end >= self.tx_len:
self.tx_done = True
if self.debug:
print(f"ISO-TP: TX - consecutive frame - {hex(self._can_client.tx_addr)} idx={self.tx_idx} done={self.tx_done}")
elif rx_data[0] == 0x31:
# wait (do nothing until next flow control message)
if self.debug:
print(f"ISO-TP: TX - flow control wait - {hex(self._can_client.tx_addr)}")
return ISOTP_FRAME_TYPE.FLOW
# 4-15 - reserved
else:
raise Exception(f"isotp - rx: invalid frame type: {rx_data[0] >> 4}")
FUNCTIONAL_ADDRS = [0x7DF, 0x18DB33F1]
def get_rx_addr_for_tx_addr(tx_addr, rx_offset=0x8):
if tx_addr in FUNCTIONAL_ADDRS:
return None
if tx_addr < 0xFFF8:
# pseudo-standard 11 bit response addr (add 8) works for most manufacturers
# allow override; some manufacturers use other offsets for non-OBD2 access
return tx_addr + rx_offset
if tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF:
# standard 29 bit response addr (flip last two bytes)
return (tx_addr & 0xFFFF0000) + (tx_addr << 8 & 0xFF00) + (tx_addr >> 8 & 0xFF)
raise ValueError("invalid tx_addr: {}".format(tx_addr))
class UdsClient():
def __init__(self, panda, tx_addr: int, rx_addr: Optional[int] = None, bus: int = 0, sub_addr: Optional[int] = None, timeout: float = 1,
debug: bool = False, tx_timeout: float = 1, response_pending_timeout: float = 10):
self.bus = bus
self.tx_addr = tx_addr
self.rx_addr = rx_addr if rx_addr is not None else get_rx_addr_for_tx_addr(tx_addr)
self.sub_addr = sub_addr
self.timeout = timeout
self.debug = debug
can_send_with_timeout = partial(panda.can_send, timeout=int(tx_timeout*1000))
self._can_client = CanClient(can_send_with_timeout, panda.can_recv, self.tx_addr, self.rx_addr, self.bus, self.sub_addr, debug=self.debug)
self.response_pending_timeout = response_pending_timeout
# generic uds request
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: Optional[int] = None, data: Optional[bytes] = None) -> bytes:
req = bytes([service_type])
if subfunction is not None:
req += bytes([subfunction])
if data is not None:
req += data
# send request, wait for response
max_len = 8 if self.sub_addr is None else 7
isotp_msg = IsoTpMessage(self._can_client, timeout=self.timeout, debug=self.debug, max_len=max_len)
isotp_msg.send(req)
response_pending = False
while True:
timeout = self.response_pending_timeout if response_pending else self.timeout
resp, _ = isotp_msg.recv(timeout)
if resp is None:
continue
response_pending = False
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
if resp_sid == 0x7F:
service_id = resp[1] if len(resp) > 1 else -1
try:
service_desc = SERVICE_TYPE(service_id).name
except BaseException:
service_desc = 'NON_STANDARD_SERVICE'
error_code = resp[2] if len(resp) > 2 else -1
try:
error_desc = _negative_response_codes[error_code]
except BaseException:
error_desc = resp[3:].hex()
# wait for another message if response pending
if error_code == 0x78:
response_pending = True
if self.debug:
print("UDS-RX: response pending")
continue
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
# positive response
if service_type + 0x40 != resp_sid:
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
if subfunction is not None:
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctionError(f'invalid response subfunction: {resp_sfn_hex}')
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
# services
def diagnostic_session_control(self, session_type: SESSION_TYPE):
self._uds_request(SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
def ecu_reset(self, reset_type: RESET_TYPE):
resp = self._uds_request(SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = resp[0]
return power_down_time
def security_access(self, access_type: ACCESS_TYPE, security_key: bytes = b'', data_record: bytes = b''):
request_seed = access_type % 2 != 0
if request_seed and len(security_key) != 0:
raise ValueError('security_key not allowed')
if not request_seed and len(security_key) == 0:
raise ValueError('security_key is missing')
if not request_seed and len(data_record) != 0:
raise ValueError('data_record not allowed')
data = security_key + data_record
resp = self._uds_request(SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=data)
if request_seed:
security_seed = resp
return security_seed
def communication_control(self, control_type: CONTROL_TYPE, message_type: MESSAGE_TYPE):
data = bytes([message_type])
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
def tester_present(self, ):
self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: Optional[bytes] = None):
write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
read_values = (timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET)
if not write_custom_values and parameter_values is not None:
raise ValueError('parameter_values not allowed')
if write_custom_values and parameter_values is None:
raise ValueError('parameter_values is missing')
resp = self._uds_request(SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values)
if read_values:
# TODO: parse response into values?
parameter_values = resp
return parameter_values
def secured_data_transmission(self, data: bytes):
# TODO: split data into multiple input parameters?
resp = self._uds_request(SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
# TODO: parse response into multiple output values?
return resp
def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE):
self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int,
event_type_record: int, service_response_record: int):
if store_event:
response_event_type |= 0x20 # type: ignore
# TODO: split record parameters into arrays
data = bytes([window_time, event_type_record, service_response_record])
resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
if response_event_type == RESPONSE_EVENT_TYPE.REPORT_ACTIVATED_EVENTS:
return {
"num_of_activated_events": resp[0],
"data": resp[1:], # TODO: parse the reset of response
}
return {
"num_of_identified_events": resp[0],
"event_window_time": resp[1],
"data": resp[2:], # TODO: parse the reset of response
}
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: Optional[BAUD_RATE_TYPE] = None):
data: Optional[bytes]
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = bytes([cast(int, baud_rate_type)])
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
# baud_rate_type = custom value (3 bytes big-endian)
data = struct.pack('!I', baud_rate_type)[1:]
else:
data = None
self._uds_request(SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)
def read_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
# TODO: support list of identifiers
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {} expected: {}'.format(hex(resp_id), hex(data_identifier_type)))
return resp[2:]
def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = bytes([memory_size_bytes << 4 | memory_address_bytes])
if memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
if memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
return resp
def read_scaling_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:] # TODO: parse the response
def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int):
# TODO: support list of identifiers
data = bytes([transmission_mode_type, periodic_data_identifier])
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int,
source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
data += struct.pack('!H', s.data_identifier) + bytes([s.position, s.memory_size])
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += bytes([memory_size_bytes << 4 | memory_address_bytes])
for s in source_definitions:
if s.memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(s.memory_address))
data += struct.pack('!I', s.memory_address)[4 - memory_address_bytes:]
if s.memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(s.memory_size))
data += struct.pack('!I', s.memory_size)[4 - memory_size_bytes:]
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
pass
else:
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes):
data = struct.pack('!H', data_identifier_type) + data_record
resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int = 4, memory_size_bytes: int = 1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = bytes([memory_size_bytes << 4 | memory_address_bytes])
if memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
if memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
data += data_record
self._uds_request(SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)
def clear_diagnostic_information(self, dtc_group_type: DTC_GROUP_TYPE):
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE = DTC_STATUS_MASK_TYPE.ALL,
dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE = DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int = 0xFFFFFF,
dtc_snapshot_record_num: int = 0xFF, dtc_extended_record_num: int = 0xFF):
data = b''
# dtc_status_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK:
data += bytes([dtc_status_mask_type])
# dtc_mask_record
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.SEVERITY_INFORMATION_OF_DTC:
data += struct.pack('!I', dtc_mask_record)[1:] # 3 bytes
# dtc_snapshot_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER:
data += bytes([dtc_snapshot_record_num])
# dtc_extended_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
data += bytes([dtc_extended_record_num])
# dtc_severity_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
data += bytes([dtc_severity_mask_type, dtc_status_mask_type])
resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
# TODO: parse response
return resp
def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE,
control_option_record: bytes = b'', control_enable_mask_record: bytes = b''):
data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes = b''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
return resp[2:]
def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 4, data_format: int = 0x00):
data = bytes([data_format])
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += bytes([memory_size_bytes << 4 | memory_address_bytes])
if memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
if memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0]
else:
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len))
return max_num_bytes # max number of bytes per transfer data request
def request_upload(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 4, data_format: int = 0x00):
data = bytes([data_format])
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += bytes([memory_size_bytes << 4 | memory_address_bytes])
if memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
if memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0]
else:
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len))
return max_num_bytes # max number of bytes per transfer data request
def transfer_data(self, block_sequence_count: int, data: bytes = b''):
data = bytes([block_sequence_count]) + data
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = resp[0] if len(resp) > 0 else None
if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
return resp[1:]
def request_transfer_exit(self):
self._uds_request(SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None)

98
panda/python/usb.py Normal file
View File

@@ -0,0 +1,98 @@
import struct
from .base import BaseHandle, BaseSTBootloaderHandle, TIMEOUT
from .constants import McuType
class PandaUsbHandle(BaseHandle):
def __init__(self, libusb_handle):
self._libusb_handle = libusb_handle
def close(self):
self._libusb_handle.close()
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = TIMEOUT, expect_disconnect: bool = False):
return self._libusb_handle.controlWrite(request_type, request, value, index, data, timeout)
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = TIMEOUT):
return self._libusb_handle.controlRead(request_type, request, value, index, length, timeout)
def bulkWrite(self, endpoint: int, data: bytes, timeout: int = TIMEOUT) -> int:
return self._libusb_handle.bulkWrite(endpoint, data, timeout) # type: ignore
def bulkRead(self, endpoint: int, length: int, timeout: int = TIMEOUT) -> bytes:
return self._libusb_handle.bulkRead(endpoint, length, timeout) # type: ignore
class STBootloaderUSBHandle(BaseSTBootloaderHandle):
DFU_DNLOAD = 1
DFU_UPLOAD = 2
DFU_GETSTATUS = 3
DFU_CLRSTATUS = 4
DFU_ABORT = 6
def __init__(self, libusb_device, libusb_handle):
self._libusb_handle = libusb_handle
# example from F4: lsusb -v | grep Flash
# iInterface 4 @Internal Flash /0x08000000/04*016Kg,01*064Kg,011*128Kg
for i in range(20):
desc = libusb_handle.getStringDescriptor(i, 0)
if desc is not None and desc.startswith("@Internal Flash"):
sector_count = sum([int(s.split('*')[0]) for s in desc.split('/')[-1].split(',')])
break
mcu_by_sector_count = {m.config.sector_count: m for m in McuType}
assert sector_count in mcu_by_sector_count, f"Unkown MCU: {sector_count=}"
self._mcu_type = mcu_by_sector_count[sector_count]
def _status(self) -> None:
while 1:
dat = self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6)
if dat[1] == 0:
break
def _erase_page_address(self, address: int) -> None:
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x41" + struct.pack("I", address))
self._status()
def get_mcu_type(self):
return self._mcu_type
def erase_sector(self, sector: int):
self._erase_page_address(self._mcu_type.config.sector_address(sector))
def clear_status(self):
# Clear status
stat = self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6)
if stat[4] == 0xa:
self._libusb_handle.controlRead(0x21, self.DFU_CLRSTATUS, 0, 0, 0)
elif stat[4] == 0x9:
self._libusb_handle.controlWrite(0x21, self.DFU_ABORT, 0, 0, b"")
self._status()
stat = str(self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6))
def close(self):
self._libusb_handle.close()
def program(self, address, dat):
# Set Address Pointer
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
self._status()
# Program
bs = min(len(dat), self._mcu_type.config.block_size)
dat += b"\xFF" * ((bs - len(dat)) % bs)
for i in range(len(dat) // bs):
ldat = dat[i * bs:(i + 1) * bs]
print("programming %d with length %d" % (i, len(ldat)))
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 2 + i, 0, ldat)
self._status()
def jump(self, address):
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
self._status()
try:
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 2, 0, b"")
_ = str(self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6))
except Exception:
pass