wip
This commit is contained in:
@@ -1,7 +1,8 @@
|
||||
import time
|
||||
import struct
|
||||
from collections import deque
|
||||
from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast
|
||||
from typing import NamedTuple, Deque, cast
|
||||
from collections.abc import Callable, Generator
|
||||
from enum import IntEnum
|
||||
from functools import partial
|
||||
|
||||
@@ -300,8 +301,8 @@ def get_dtc_status_names(status):
|
||||
return result
|
||||
|
||||
class CanClient():
|
||||
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]],
|
||||
tx_addr: int, rx_addr: int, bus: int, sub_addr: Optional[int] = None, debug: bool = False):
|
||||
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], list[tuple[int, int, bytes, int]]],
|
||||
tx_addr: int, rx_addr: int, bus: int, sub_addr: int | None = None, debug: bool = False):
|
||||
self.tx = can_send
|
||||
self.rx = can_recv
|
||||
self.tx_addr = tx_addr
|
||||
@@ -335,7 +336,7 @@ class CanClient():
|
||||
msgs = self.rx()
|
||||
if drain:
|
||||
if self.debug:
|
||||
print("CAN-RX: drain - {}".format(len(msgs)))
|
||||
print(f"CAN-RX: drain - {len(msgs)}")
|
||||
self.rx_buff.clear()
|
||||
else:
|
||||
for rx_addr, _, rx_data, rx_bus in msgs or []:
|
||||
@@ -366,7 +367,7 @@ class CanClient():
|
||||
except IndexError:
|
||||
pass # empty
|
||||
|
||||
def send(self, msgs: List[bytes], delay: float = 0) -> None:
|
||||
def send(self, msgs: list[bytes], delay: float = 0) -> None:
|
||||
for i, msg in enumerate(msgs):
|
||||
if delay and i != 0:
|
||||
if self.debug:
|
||||
@@ -443,7 +444,7 @@ class IsoTpMessage():
|
||||
if not setup_only:
|
||||
self._can_client.send([msg])
|
||||
|
||||
def recv(self, timeout=None) -> Tuple[Optional[bytes], bool]:
|
||||
def recv(self, timeout=None) -> tuple[bytes | None, bool]:
|
||||
if timeout is None:
|
||||
timeout = self.timeout
|
||||
|
||||
@@ -566,11 +567,11 @@ def get_rx_addr_for_tx_addr(tx_addr, rx_offset=0x8):
|
||||
# standard 29 bit response addr (flip last two bytes)
|
||||
return (tx_addr & 0xFFFF0000) + (tx_addr << 8 & 0xFF00) + (tx_addr >> 8 & 0xFF)
|
||||
|
||||
raise ValueError("invalid tx_addr: {}".format(tx_addr))
|
||||
raise ValueError(f"invalid tx_addr: {tx_addr}")
|
||||
|
||||
|
||||
class UdsClient():
|
||||
def __init__(self, panda, tx_addr: int, rx_addr: Optional[int] = None, bus: int = 0, sub_addr: Optional[int] = None, timeout: float = 1,
|
||||
def __init__(self, panda, tx_addr: int, rx_addr: int | None = None, bus: int = 0, sub_addr: int | None = None, timeout: float = 1,
|
||||
debug: bool = False, tx_timeout: float = 1, response_pending_timeout: float = 10):
|
||||
self.bus = bus
|
||||
self.tx_addr = tx_addr
|
||||
@@ -583,7 +584,7 @@ class UdsClient():
|
||||
self.response_pending_timeout = response_pending_timeout
|
||||
|
||||
# generic uds request
|
||||
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: Optional[int] = None, data: Optional[bytes] = None) -> bytes:
|
||||
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int | None = None, data: bytes | None = None) -> bytes:
|
||||
req = bytes([service_type])
|
||||
if subfunction is not None:
|
||||
req += bytes([subfunction])
|
||||
@@ -623,12 +624,12 @@ class UdsClient():
|
||||
if self.debug:
|
||||
print("UDS-RX: response pending")
|
||||
continue
|
||||
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
|
||||
raise NegativeResponseError(f'{service_desc} - {error_desc}', service_id, error_code)
|
||||
|
||||
# positive response
|
||||
if service_type + 0x40 != resp_sid:
|
||||
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
|
||||
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
|
||||
raise InvalidServiceIdError(f'invalid response service id: {resp_sid_hex}')
|
||||
|
||||
if subfunction is not None:
|
||||
resp_sfn = resp[1] if len(resp) > 1 else None
|
||||
@@ -671,7 +672,7 @@ class UdsClient():
|
||||
def tester_present(self, ):
|
||||
self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
|
||||
|
||||
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: Optional[bytes] = None):
|
||||
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes | None = None):
|
||||
write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
|
||||
read_values = (timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
|
||||
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET)
|
||||
@@ -714,8 +715,8 @@ class UdsClient():
|
||||
"data": resp[2:], # TODO: parse the reset of response
|
||||
}
|
||||
|
||||
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: Optional[BAUD_RATE_TYPE] = None):
|
||||
data: Optional[bytes]
|
||||
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE | None = None):
|
||||
data: bytes | None
|
||||
|
||||
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
|
||||
# baud_rate_type = BAUD_RATE_TYPE
|
||||
@@ -733,21 +734,21 @@ class UdsClient():
|
||||
resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
|
||||
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
|
||||
if resp_id != data_identifier_type:
|
||||
raise ValueError('invalid response data identifier: {} expected: {}'.format(hex(resp_id), hex(data_identifier_type)))
|
||||
raise ValueError(f'invalid response data identifier: {hex(resp_id)} expected: {hex(data_identifier_type)}')
|
||||
return resp[2:]
|
||||
|
||||
def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 1):
|
||||
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||
raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
|
||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||
raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
|
||||
data = bytes([memory_size_bytes << 4 | memory_address_bytes])
|
||||
|
||||
if memory_address >= 1 << (memory_address_bytes * 8):
|
||||
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
||||
raise ValueError(f'invalid memory_address: {memory_address}')
|
||||
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
|
||||
if memory_size >= 1 << (memory_size_bytes * 8):
|
||||
raise ValueError('invalid memory_size: {}'.format(memory_size))
|
||||
raise ValueError(f'invalid memory_size: {memory_size}')
|
||||
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
|
||||
|
||||
resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
|
||||
@@ -758,7 +759,7 @@ class UdsClient():
|
||||
resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
|
||||
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
|
||||
if resp_id != data_identifier_type:
|
||||
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
|
||||
raise ValueError(f'invalid response data identifier: {hex(resp_id)}')
|
||||
return resp[2:] # TODO: parse the response
|
||||
|
||||
def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int):
|
||||
@@ -767,11 +768,11 @@ class UdsClient():
|
||||
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
|
||||
|
||||
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int,
|
||||
source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1):
|
||||
source_definitions: list[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1):
|
||||
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||
raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
|
||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||
raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
|
||||
|
||||
data = struct.pack('!H', dynamic_data_identifier)
|
||||
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
|
||||
@@ -781,15 +782,15 @@ class UdsClient():
|
||||
data += bytes([memory_size_bytes << 4 | memory_address_bytes])
|
||||
for s in source_definitions:
|
||||
if s.memory_address >= 1 << (memory_address_bytes * 8):
|
||||
raise ValueError('invalid memory_address: {}'.format(s.memory_address))
|
||||
raise ValueError(f'invalid memory_address: {s.memory_address}')
|
||||
data += struct.pack('!I', s.memory_address)[4 - memory_address_bytes:]
|
||||
if s.memory_size >= 1 << (memory_size_bytes * 8):
|
||||
raise ValueError('invalid memory_size: {}'.format(s.memory_size))
|
||||
raise ValueError(f'invalid memory_size: {s.memory_size}')
|
||||
data += struct.pack('!I', s.memory_size)[4 - memory_size_bytes:]
|
||||
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
|
||||
pass
|
||||
else:
|
||||
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
|
||||
raise ValueError(f'invalid dynamic identifier type: {hex(dynamic_definition_type)}')
|
||||
self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
|
||||
|
||||
def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes):
|
||||
@@ -797,20 +798,20 @@ class UdsClient():
|
||||
resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
|
||||
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
|
||||
if resp_id != data_identifier_type:
|
||||
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
|
||||
raise ValueError(f'invalid response data identifier: {hex(resp_id)}')
|
||||
|
||||
def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int = 4, memory_size_bytes: int = 1):
|
||||
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||
raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
|
||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||
raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
|
||||
data = bytes([memory_size_bytes << 4 | memory_address_bytes])
|
||||
|
||||
if memory_address >= 1 << (memory_address_bytes * 8):
|
||||
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
||||
raise ValueError(f'invalid memory_address: {memory_address}')
|
||||
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
|
||||
if memory_size >= 1 << (memory_size_bytes * 8):
|
||||
raise ValueError('invalid memory_size: {}'.format(memory_size))
|
||||
raise ValueError(f'invalid memory_size: {memory_size}')
|
||||
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
|
||||
|
||||
data += data_record
|
||||
@@ -864,7 +865,7 @@ class UdsClient():
|
||||
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
|
||||
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
|
||||
if resp_id != data_identifier_type:
|
||||
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
|
||||
raise ValueError(f'invalid response data identifier: {hex(resp_id)}')
|
||||
return resp[2:]
|
||||
|
||||
def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes = b''):
|
||||
@@ -872,23 +873,23 @@ class UdsClient():
|
||||
resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
|
||||
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
|
||||
if resp_id != routine_identifier_type:
|
||||
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
|
||||
raise ValueError(f'invalid response routine identifier: {hex(resp_id)}')
|
||||
return resp[2:]
|
||||
|
||||
def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 4, data_format: int = 0x00):
|
||||
data = bytes([data_format])
|
||||
|
||||
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||
raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
|
||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||
raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
|
||||
data += bytes([memory_size_bytes << 4 | memory_address_bytes])
|
||||
|
||||
if memory_address >= 1 << (memory_address_bytes * 8):
|
||||
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
||||
raise ValueError(f'invalid memory_address: {memory_address}')
|
||||
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
|
||||
if memory_size >= 1 << (memory_size_bytes * 8):
|
||||
raise ValueError('invalid memory_size: {}'.format(memory_size))
|
||||
raise ValueError(f'invalid memory_size: {memory_size}')
|
||||
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
|
||||
|
||||
resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
|
||||
@@ -896,7 +897,7 @@ class UdsClient():
|
||||
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
|
||||
max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0]
|
||||
else:
|
||||
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len))
|
||||
raise ValueError(f'invalid max_num_bytes_len: {max_num_bytes_len}')
|
||||
|
||||
return max_num_bytes # max number of bytes per transfer data request
|
||||
|
||||
@@ -904,16 +905,16 @@ class UdsClient():
|
||||
data = bytes([data_format])
|
||||
|
||||
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||
raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
|
||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||
raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
|
||||
data += bytes([memory_size_bytes << 4 | memory_address_bytes])
|
||||
|
||||
if memory_address >= 1 << (memory_address_bytes * 8):
|
||||
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
||||
raise ValueError(f'invalid memory_address: {memory_address}')
|
||||
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
|
||||
if memory_size >= 1 << (memory_size_bytes * 8):
|
||||
raise ValueError('invalid memory_size: {}'.format(memory_size))
|
||||
raise ValueError(f'invalid memory_size: {memory_size}')
|
||||
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
|
||||
|
||||
resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
|
||||
@@ -921,7 +922,7 @@ class UdsClient():
|
||||
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
|
||||
max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0]
|
||||
else:
|
||||
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len))
|
||||
raise ValueError(f'invalid max_num_bytes_len: {max_num_bytes_len}')
|
||||
|
||||
return max_num_bytes # max number of bytes per transfer data request
|
||||
|
||||
@@ -930,7 +931,7 @@ class UdsClient():
|
||||
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
|
||||
resp_id = resp[0] if len(resp) > 0 else None
|
||||
if resp_id != block_sequence_count:
|
||||
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
|
||||
raise ValueError(f'invalid block_sequence_count: {resp_id}')
|
||||
return resp[1:]
|
||||
|
||||
def request_transfer_exit(self):
|
||||
|
||||
Reference in New Issue
Block a user