wip
This commit is contained in:
1
opendbc/can/tests/.gitignore
vendored
Normal file
1
opendbc/can/tests/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
*.bz2
|
||||
8
opendbc/can/tests/__init__.py
Normal file
8
opendbc/can/tests/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
import glob
|
||||
import os
|
||||
|
||||
from opendbc import DBC_PATH
|
||||
|
||||
ALL_DBCS = [os.path.basename(dbc).split('.')[0] for dbc in
|
||||
glob.glob(f"{DBC_PATH}/*.dbc")]
|
||||
TEST_DBC = os.path.abspath(os.path.join(os.path.dirname(__file__), "test.dbc"))
|
||||
27
opendbc/can/tests/test.dbc
Normal file
27
opendbc/can/tests/test.dbc
Normal file
@@ -0,0 +1,27 @@
|
||||
CM_ "This DBC is used for the CAN parser and packer tests.";
|
||||
|
||||
BO_ 228 STEERING_CONTROL: 5 EON
|
||||
SG_ STEER_TORQUE_REQUEST : 23|1@0+ (1,0) [0|1] "" EPS
|
||||
SG_ SET_ME_X00 : 22|7@0+ (1,0) [0|127] "" EPS
|
||||
SG_ SET_ME_X00_2 : 31|8@0+ (1,0) [0|0] "" EPS
|
||||
SG_ STEER_TORQUE : 7|16@0- (1,0) [-4096|4096] "" EPS
|
||||
SG_ STEER_DOWN_TO_ZERO : 38|1@0+ (1,0) [0|1] "" EPS
|
||||
SG_ COUNTER : 37|2@0+ (1,0) [0|3] "" EPS
|
||||
SG_ CHECKSUM : 35|4@0+ (1,0) [0|15] "" EPS
|
||||
|
||||
BO_ 316 Brake_Status: 8 XXX
|
||||
SG_ CHECKSUM : 0|8@1+ (1,0) [0|255] "" XXX
|
||||
SG_ COUNTER : 8|4@1+ (1,0) [0|15] "" XXX
|
||||
SG_ Signal1 : 12|46@1+ (1,0) [0|1] "" XXX
|
||||
SG_ ES_Brake : 58|1@1+ (1,0) [0|1] "" XXX
|
||||
SG_ Signal2 : 59|3@1+ (1,0) [0|1] "" XXX
|
||||
SG_ Brake : 62|1@1+ (1,0) [0|1] "" XXX
|
||||
SG_ Signal3 : 63|1@1+ (1,0) [0|1] "" XXX
|
||||
|
||||
BO_ 245 CAN_FD_MESSAGE: 32 XXX
|
||||
SG_ COUNTER : 7|8@0+ (1,0) [0|1] "" XXX
|
||||
SG_ SIGNED : 22|16@0- (1,0) [0|1] "" XXX
|
||||
SG_ 64_BIT_LE : 159|64@1+ (1,0) [0|1] "" XXX
|
||||
SG_ 64_BIT_BE : 80|64@0+ (1,0) [0|1] "" XXX
|
||||
|
||||
VAL_ 80 NON_EXISTENT_ADDR 0 "test";
|
||||
42
opendbc/can/tests/test_checksums.py
Normal file
42
opendbc/can/tests/test_checksums.py
Normal file
@@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
|
||||
from opendbc.can.parser import CANParser
|
||||
from opendbc.can.packer import CANPacker
|
||||
from opendbc.can.tests.test_packer_parser import can_list_to_can_capnp
|
||||
|
||||
|
||||
class TestCanChecksums(unittest.TestCase):
|
||||
|
||||
def test_honda_checksum(self):
|
||||
"""Test checksums for Honda standard and extended CAN ids"""
|
||||
dbc_file = "honda_accord_2018_can_generated"
|
||||
msgs = [("LKAS_HUD", 0), ("LKAS_HUD_A", 0)]
|
||||
parser = CANParser(dbc_file, msgs, 0)
|
||||
packer = CANPacker(dbc_file)
|
||||
|
||||
values = {
|
||||
'SET_ME_X41': 0x41,
|
||||
'STEERING_REQUIRED': 1,
|
||||
'SOLID_LANES': 1,
|
||||
'BEEP': 0,
|
||||
}
|
||||
|
||||
# known correct checksums according to the above values
|
||||
checksum_std = [11, 10, 9, 8]
|
||||
checksum_ext = [4, 3, 2, 1]
|
||||
|
||||
for std, ext in zip(checksum_std, checksum_ext):
|
||||
msgs = [
|
||||
packer.make_can_msg("LKAS_HUD", 0, values),
|
||||
packer.make_can_msg("LKAS_HUD_A", 0, values),
|
||||
]
|
||||
can_strings = [can_list_to_can_capnp(msgs), ]
|
||||
parser.update_strings(can_strings)
|
||||
|
||||
self.assertEqual(parser.vl['LKAS_HUD']['CHECKSUM'], std)
|
||||
self.assertEqual(parser.vl['LKAS_HUD_A']['CHECKSUM'], ext)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
36
opendbc/can/tests/test_dbc_exceptions.py
Normal file
36
opendbc/can/tests/test_dbc_exceptions.py
Normal file
@@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
|
||||
from opendbc.can.parser import CANParser, CANDefine
|
||||
from opendbc.can.packer import CANPacker
|
||||
from opendbc.can.tests import TEST_DBC
|
||||
|
||||
|
||||
class TestCanParserPackerExceptions(unittest.TestCase):
|
||||
def test_civic_exceptions(self):
|
||||
dbc_file = "honda_civic_touring_2016_can_generated"
|
||||
dbc_invalid = dbc_file + "abcdef"
|
||||
msgs = [("STEERING_CONTROL", 50)]
|
||||
with self.assertRaises(RuntimeError):
|
||||
CANParser(dbc_invalid, msgs, 0)
|
||||
with self.assertRaises(RuntimeError):
|
||||
CANPacker(dbc_invalid)
|
||||
with self.assertRaises(RuntimeError):
|
||||
CANDefine(dbc_invalid)
|
||||
with self.assertRaises(KeyError):
|
||||
CANDefine(TEST_DBC)
|
||||
|
||||
parser = CANParser(dbc_file, msgs, 0)
|
||||
with self.assertRaises(RuntimeError):
|
||||
parser.update_strings([b''])
|
||||
|
||||
# Everything is supposed to work below
|
||||
CANParser(dbc_file, msgs, 0)
|
||||
CANParser(dbc_file, [], 0)
|
||||
CANPacker(dbc_file)
|
||||
CANDefine(dbc_file)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
28
opendbc/can/tests/test_dbc_parser.py
Normal file
28
opendbc/can/tests/test_dbc_parser.py
Normal file
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
|
||||
from opendbc.can.parser import CANParser
|
||||
from opendbc.can.tests import ALL_DBCS
|
||||
|
||||
|
||||
class TestDBCParser(unittest.TestCase):
|
||||
def test_enough_dbcs(self):
|
||||
# sanity check that we're running on the real DBCs
|
||||
self.assertGreater(len(ALL_DBCS), 20)
|
||||
|
||||
def test_parse_all_dbcs(self):
|
||||
"""
|
||||
Dynamic DBC parser checks:
|
||||
- Checksum and counter length, start bit, endianness
|
||||
- Duplicate message addresses and names
|
||||
- Signal out of bounds
|
||||
- All BO_, SG_, VAL_ lines for syntax errors
|
||||
"""
|
||||
|
||||
for dbc in ALL_DBCS:
|
||||
with self.subTest(dbc=dbc):
|
||||
CANParser(dbc, [], 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
35
opendbc/can/tests/test_define.py
Normal file
35
opendbc/can/tests/test_define.py
Normal file
@@ -0,0 +1,35 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
|
||||
from opendbc.can.can_define import CANDefine
|
||||
from opendbc.can.tests import ALL_DBCS
|
||||
|
||||
|
||||
class TestCADNDefine(unittest.TestCase):
|
||||
def test_civic(self):
|
||||
|
||||
dbc_file = "honda_civic_touring_2016_can_generated"
|
||||
defs = CANDefine(dbc_file)
|
||||
|
||||
self.assertDictEqual(defs.dv[399], defs.dv['STEER_STATUS'])
|
||||
self.assertDictEqual(defs.dv[399],
|
||||
{'STEER_STATUS':
|
||||
{7: 'PERMANENT_FAULT',
|
||||
6: 'TMP_FAULT',
|
||||
5: 'FAULT_1',
|
||||
4: 'NO_TORQUE_ALERT_2',
|
||||
3: 'LOW_SPEED_LOCKOUT',
|
||||
2: 'NO_TORQUE_ALERT_1',
|
||||
0: 'NORMAL'}
|
||||
}
|
||||
)
|
||||
|
||||
def test_all_dbcs(self):
|
||||
# Asserts no exceptions on all DBCs
|
||||
for dbc in ALL_DBCS:
|
||||
with self.subTest(dbc=dbc):
|
||||
CANDefine(dbc)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
386
opendbc/can/tests/test_packer_parser.py
Normal file
386
opendbc/can/tests/test_packer_parser.py
Normal file
@@ -0,0 +1,386 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import random
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from opendbc.can.parser import CANParser
|
||||
from opendbc.can.packer import CANPacker
|
||||
from opendbc.can.tests import TEST_DBC
|
||||
|
||||
MAX_BAD_COUNTER = 5
|
||||
|
||||
|
||||
# Python implementation so we don't have to depend on boardd
|
||||
def can_list_to_can_capnp(can_msgs, msgtype='can', logMonoTime=None):
|
||||
dat = messaging.new_message(msgtype, len(can_msgs))
|
||||
|
||||
if logMonoTime is not None:
|
||||
dat.logMonoTime = logMonoTime
|
||||
|
||||
for i, can_msg in enumerate(can_msgs):
|
||||
if msgtype == 'sendcan':
|
||||
cc = dat.sendcan[i]
|
||||
else:
|
||||
cc = dat.can[i]
|
||||
|
||||
cc.address = can_msg[0]
|
||||
cc.busTime = can_msg[1]
|
||||
cc.dat = bytes(can_msg[2])
|
||||
cc.src = can_msg[3]
|
||||
|
||||
return dat.to_bytes()
|
||||
|
||||
|
||||
class TestCanParserPacker(unittest.TestCase):
|
||||
def test_packer(self):
|
||||
packer = CANPacker(TEST_DBC)
|
||||
|
||||
for b in range(6):
|
||||
for i in range(256):
|
||||
values = {"COUNTER": i}
|
||||
addr, _, dat, bus = packer.make_can_msg("CAN_FD_MESSAGE", b, values)
|
||||
self.assertEqual(addr, 245)
|
||||
self.assertEqual(bus, b)
|
||||
self.assertEqual(dat[0], i)
|
||||
|
||||
def test_packer_counter(self):
|
||||
msgs = [("CAN_FD_MESSAGE", 0), ]
|
||||
packer = CANPacker(TEST_DBC)
|
||||
parser = CANParser(TEST_DBC, msgs, 0)
|
||||
|
||||
# packer should increment the counter
|
||||
for i in range(1000):
|
||||
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
|
||||
dat = can_list_to_can_capnp([msg, ])
|
||||
parser.update_strings([dat])
|
||||
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], i % 256)
|
||||
|
||||
# setting COUNTER should override
|
||||
for _ in range(100):
|
||||
cnt = random.randint(0, 255)
|
||||
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {
|
||||
"COUNTER": cnt,
|
||||
})
|
||||
dat = can_list_to_can_capnp([msg, ])
|
||||
parser.update_strings([dat])
|
||||
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], cnt)
|
||||
|
||||
# then, should resume counting from the override value
|
||||
cnt = parser.vl["CAN_FD_MESSAGE"]["COUNTER"]
|
||||
for i in range(100):
|
||||
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
|
||||
dat = can_list_to_can_capnp([msg, ])
|
||||
parser.update_strings([dat])
|
||||
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], (cnt + i) % 256)
|
||||
|
||||
def test_parser_can_valid(self):
|
||||
msgs = [("CAN_FD_MESSAGE", 10), ]
|
||||
packer = CANPacker(TEST_DBC)
|
||||
parser = CANParser(TEST_DBC, msgs, 0)
|
||||
|
||||
# shouldn't be valid initially
|
||||
self.assertFalse(parser.can_valid)
|
||||
|
||||
# not valid until the message is seen
|
||||
for _ in range(100):
|
||||
dat = can_list_to_can_capnp([])
|
||||
parser.update_strings([dat])
|
||||
self.assertFalse(parser.can_valid)
|
||||
|
||||
# valid once seen
|
||||
for i in range(1, 100):
|
||||
t = int(0.01 * i * 1e9)
|
||||
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
|
||||
dat = can_list_to_can_capnp([msg, ], logMonoTime=t)
|
||||
parser.update_strings([dat])
|
||||
self.assertTrue(parser.can_valid)
|
||||
|
||||
def test_parser_counter_can_valid(self):
|
||||
"""
|
||||
Tests number of allowed bad counters + ensures CAN stays invalid
|
||||
while receiving invalid messages + that we can recover
|
||||
"""
|
||||
msgs = [
|
||||
("STEERING_CONTROL", 0),
|
||||
]
|
||||
packer = CANPacker("honda_civic_touring_2016_can_generated")
|
||||
parser = CANParser("honda_civic_touring_2016_can_generated", msgs, 0)
|
||||
|
||||
msg = packer.make_can_msg("STEERING_CONTROL", 0, {"COUNTER": 0})
|
||||
bts = can_list_to_can_capnp([msg])
|
||||
|
||||
# bad static counter, invalid once it's seen MAX_BAD_COUNTER messages
|
||||
for idx in range(0x1000):
|
||||
parser.update_strings([bts])
|
||||
self.assertEqual((idx + 1) < MAX_BAD_COUNTER, parser.can_valid)
|
||||
|
||||
# one to recover
|
||||
msg = packer.make_can_msg("STEERING_CONTROL", 0, {"COUNTER": 1})
|
||||
bts = can_list_to_can_capnp([msg])
|
||||
parser.update_strings([bts])
|
||||
self.assertTrue(parser.can_valid)
|
||||
|
||||
def test_parser_no_partial_update(self):
|
||||
"""
|
||||
Ensure that the CANParser doesn't partially update messages with invalid signals (COUNTER/CHECKSUM).
|
||||
Previously, the signal update loop would only break once it got to one of these invalid signals,
|
||||
after already updating most/all of the signals.
|
||||
"""
|
||||
msgs = [
|
||||
("STEERING_CONTROL", 0),
|
||||
]
|
||||
packer = CANPacker("honda_civic_touring_2016_can_generated")
|
||||
parser = CANParser("honda_civic_touring_2016_can_generated", msgs, 0)
|
||||
|
||||
def rx_steering_msg(values, bad_checksum=False):
|
||||
msg = packer.make_can_msg("STEERING_CONTROL", 0, values)
|
||||
if bad_checksum:
|
||||
# add 1 to checksum
|
||||
msg[2] = bytearray(msg[2])
|
||||
msg[2][4] = (msg[2][4] & 0xF0) | ((msg[2][4] & 0x0F) + 1)
|
||||
|
||||
bts = can_list_to_can_capnp([msg])
|
||||
parser.update_strings([bts])
|
||||
|
||||
rx_steering_msg({"STEER_TORQUE": 100}, bad_checksum=False)
|
||||
self.assertEqual(parser.vl["STEERING_CONTROL"]["STEER_TORQUE"], 100)
|
||||
self.assertEqual(parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"], [100])
|
||||
|
||||
for _ in range(5):
|
||||
rx_steering_msg({"STEER_TORQUE": 200}, bad_checksum=True)
|
||||
self.assertEqual(parser.vl["STEERING_CONTROL"]["STEER_TORQUE"], 100)
|
||||
self.assertEqual(parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"], [])
|
||||
|
||||
# Even if CANParser doesn't update instantaneous vl, make sure it didn't add invalid values to vl_all
|
||||
rx_steering_msg({"STEER_TORQUE": 300}, bad_checksum=False)
|
||||
self.assertEqual(parser.vl["STEERING_CONTROL"]["STEER_TORQUE"], 300)
|
||||
self.assertEqual(parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"], [300])
|
||||
|
||||
def test_packer_parser(self):
|
||||
msgs = [
|
||||
("Brake_Status", 0),
|
||||
("CAN_FD_MESSAGE", 0),
|
||||
("STEERING_CONTROL", 0),
|
||||
]
|
||||
packer = CANPacker(TEST_DBC)
|
||||
parser = CANParser(TEST_DBC, msgs, 0)
|
||||
|
||||
for steer in range(-256, 255):
|
||||
for active in (1, 0):
|
||||
values = {
|
||||
"STEERING_CONTROL": {
|
||||
"STEER_TORQUE": steer,
|
||||
"STEER_TORQUE_REQUEST": active,
|
||||
},
|
||||
"Brake_Status": {
|
||||
"Signal1": 61042322657536.0,
|
||||
},
|
||||
"CAN_FD_MESSAGE": {
|
||||
"SIGNED": steer,
|
||||
"64_BIT_LE": random.randint(0, 100),
|
||||
"64_BIT_BE": random.randint(0, 100),
|
||||
},
|
||||
}
|
||||
|
||||
msgs = [packer.make_can_msg(k, 0, v) for k, v in values.items()]
|
||||
bts = can_list_to_can_capnp(msgs)
|
||||
parser.update_strings([bts])
|
||||
|
||||
for k, v in values.items():
|
||||
for key, val in v.items():
|
||||
self.assertAlmostEqual(parser.vl[k][key], val)
|
||||
|
||||
# also check address
|
||||
for sig in ("STEER_TORQUE", "STEER_TORQUE_REQUEST", "COUNTER", "CHECKSUM"):
|
||||
self.assertEqual(parser.vl["STEERING_CONTROL"][sig], parser.vl[228][sig])
|
||||
|
||||
def test_scale_offset(self):
|
||||
"""Test that both scale and offset are correctly preserved"""
|
||||
dbc_file = "honda_civic_touring_2016_can_generated"
|
||||
msgs = [("VSA_STATUS", 50)]
|
||||
parser = CANParser(dbc_file, msgs, 0)
|
||||
packer = CANPacker(dbc_file)
|
||||
|
||||
for brake in range(100):
|
||||
values = {"USER_BRAKE": brake}
|
||||
msgs = packer.make_can_msg("VSA_STATUS", 0, values)
|
||||
bts = can_list_to_can_capnp([msgs])
|
||||
|
||||
parser.update_strings([bts])
|
||||
|
||||
self.assertAlmostEqual(parser.vl["VSA_STATUS"]["USER_BRAKE"], brake)
|
||||
|
||||
def test_subaru(self):
|
||||
# Subaru is little endian
|
||||
|
||||
dbc_file = "subaru_global_2017_generated"
|
||||
|
||||
msgs = [("ES_LKAS", 50)]
|
||||
|
||||
parser = CANParser(dbc_file, msgs, 0)
|
||||
packer = CANPacker(dbc_file)
|
||||
|
||||
idx = 0
|
||||
for steer in range(-256, 255):
|
||||
for active in [1, 0]:
|
||||
values = {
|
||||
"LKAS_Output": steer,
|
||||
"LKAS_Request": active,
|
||||
"SET_1": 1
|
||||
}
|
||||
|
||||
msgs = packer.make_can_msg("ES_LKAS", 0, values)
|
||||
bts = can_list_to_can_capnp([msgs])
|
||||
parser.update_strings([bts])
|
||||
|
||||
self.assertAlmostEqual(parser.vl["ES_LKAS"]["LKAS_Output"], steer)
|
||||
self.assertAlmostEqual(parser.vl["ES_LKAS"]["LKAS_Request"], active)
|
||||
self.assertAlmostEqual(parser.vl["ES_LKAS"]["SET_1"], 1)
|
||||
self.assertAlmostEqual(parser.vl["ES_LKAS"]["COUNTER"], idx % 16)
|
||||
idx += 1
|
||||
|
||||
def test_bus_timeout(self):
|
||||
"""Test CAN bus timeout detection"""
|
||||
dbc_file = "honda_civic_touring_2016_can_generated"
|
||||
|
||||
freq = 100
|
||||
msgs = [("VSA_STATUS", freq), ("STEER_MOTOR_TORQUE", freq/2)]
|
||||
|
||||
parser = CANParser(dbc_file, msgs, 0)
|
||||
packer = CANPacker(dbc_file)
|
||||
|
||||
i = 0
|
||||
def send_msg(blank=False):
|
||||
nonlocal i
|
||||
i += 1
|
||||
t = i*((1 / freq) * 1e9)
|
||||
|
||||
if blank:
|
||||
msgs = []
|
||||
else:
|
||||
msgs = [packer.make_can_msg("VSA_STATUS", 0, {}), ]
|
||||
|
||||
can = can_list_to_can_capnp(msgs, logMonoTime=t)
|
||||
parser.update_strings([can, ])
|
||||
|
||||
# all good, no timeout
|
||||
for _ in range(1000):
|
||||
send_msg()
|
||||
self.assertFalse(parser.bus_timeout, str(_))
|
||||
|
||||
# timeout after 10 blank msgs
|
||||
for n in range(200):
|
||||
send_msg(blank=True)
|
||||
self.assertEqual(n >= 10, parser.bus_timeout)
|
||||
|
||||
# no timeout immediately after seen again
|
||||
send_msg()
|
||||
self.assertFalse(parser.bus_timeout)
|
||||
|
||||
def test_updated(self):
|
||||
"""Test updated value dict"""
|
||||
dbc_file = "honda_civic_touring_2016_can_generated"
|
||||
msgs = [("VSA_STATUS", 50)]
|
||||
parser = CANParser(dbc_file, msgs, 0)
|
||||
packer = CANPacker(dbc_file)
|
||||
|
||||
# Make sure nothing is updated
|
||||
self.assertEqual(len(parser.vl_all["VSA_STATUS"]["USER_BRAKE"]), 0)
|
||||
|
||||
idx = 0
|
||||
for _ in range(10):
|
||||
# Ensure CANParser holds the values of any duplicate messages over multiple frames
|
||||
user_brake_vals = [random.randrange(100) for _ in range(random.randrange(5, 10))]
|
||||
half_idx = len(user_brake_vals) // 2
|
||||
can_msgs = [[], []]
|
||||
for frame, brake_vals in enumerate((user_brake_vals[:half_idx], user_brake_vals[half_idx:])):
|
||||
for user_brake in brake_vals:
|
||||
values = {"USER_BRAKE": user_brake}
|
||||
can_msgs[frame].append(packer.make_can_msg("VSA_STATUS", 0, values))
|
||||
idx += 1
|
||||
|
||||
can_strings = [can_list_to_can_capnp(msgs) for msgs in can_msgs]
|
||||
parser.update_strings(can_strings)
|
||||
vl_all = parser.vl_all["VSA_STATUS"]["USER_BRAKE"]
|
||||
|
||||
self.assertEqual(vl_all, user_brake_vals)
|
||||
if len(user_brake_vals):
|
||||
self.assertEqual(vl_all[-1], parser.vl["VSA_STATUS"]["USER_BRAKE"])
|
||||
|
||||
def test_timestamp_nanos(self):
|
||||
"""Test message timestamp dict"""
|
||||
dbc_file = "honda_civic_touring_2016_can_generated"
|
||||
|
||||
msgs = [
|
||||
("VSA_STATUS", 50),
|
||||
("POWERTRAIN_DATA", 100),
|
||||
]
|
||||
|
||||
parser = CANParser(dbc_file, msgs, 0)
|
||||
packer = CANPacker(dbc_file)
|
||||
|
||||
# Check the default timestamp is zero
|
||||
for msg in ("VSA_STATUS", "POWERTRAIN_DATA"):
|
||||
ts_nanos = parser.ts_nanos[msg].values()
|
||||
self.assertEqual(set(ts_nanos), {0})
|
||||
|
||||
# Check:
|
||||
# - timestamp is only updated for correct messages
|
||||
# - timestamp is correct for multiple runs
|
||||
# - timestamp is from the latest message if updating multiple strings
|
||||
for _ in range(10):
|
||||
can_strings = []
|
||||
log_mono_time = 0
|
||||
for i in range(10):
|
||||
log_mono_time = int(0.01 * i * 1e+9)
|
||||
can_msg = packer.make_can_msg("VSA_STATUS", 0, {})
|
||||
can_strings.append(can_list_to_can_capnp([can_msg], logMonoTime=log_mono_time))
|
||||
parser.update_strings(can_strings)
|
||||
|
||||
ts_nanos = parser.ts_nanos["VSA_STATUS"].values()
|
||||
self.assertEqual(set(ts_nanos), {log_mono_time})
|
||||
ts_nanos = parser.ts_nanos["POWERTRAIN_DATA"].values()
|
||||
self.assertEqual(set(ts_nanos), {0})
|
||||
|
||||
def test_nonexistent_messages(self):
|
||||
# Ensure we don't allow messages not in the DBC
|
||||
existing_messages = ("STEERING_CONTROL", 228, "CAN_FD_MESSAGE", 245)
|
||||
|
||||
for msg in existing_messages:
|
||||
CANParser(TEST_DBC, [(msg, 0)])
|
||||
with self.assertRaises(RuntimeError):
|
||||
new_msg = msg + "1" if isinstance(msg, str) else msg + 1
|
||||
CANParser(TEST_DBC, [(new_msg, 0)])
|
||||
|
||||
def test_track_all_signals(self):
|
||||
parser = CANParser("toyota_nodsu_pt_generated", [("ACC_CONTROL", 0)])
|
||||
self.assertEqual(parser.vl["ACC_CONTROL"], {
|
||||
"ACCEL_CMD": 0,
|
||||
"ALLOW_LONG_PRESS": 0,
|
||||
"ACC_MALFUNCTION": 0,
|
||||
"RADAR_DIRTY": 0,
|
||||
"DISTANCE": 0,
|
||||
"MINI_CAR": 0,
|
||||
"ACC_TYPE": 0,
|
||||
"CANCEL_REQ": 0,
|
||||
"ACC_CUT_IN": 0,
|
||||
"LEAD_VEHICLE_STOPPED": 0,
|
||||
"PERMIT_BRAKING": 0,
|
||||
"RELEASE_STANDSTILL": 0,
|
||||
"ITS_CONNECT_LEAD": 0,
|
||||
"ACCEL_CMD_ALT": 0,
|
||||
"CHECKSUM": 0,
|
||||
})
|
||||
|
||||
def test_disallow_duplicate_messages(self):
|
||||
CANParser("toyota_nodsu_pt_generated", [("ACC_CONTROL", 5)])
|
||||
|
||||
with self.assertRaises(RuntimeError):
|
||||
CANParser("toyota_nodsu_pt_generated", [("ACC_CONTROL", 5), ("ACC_CONTROL", 10)])
|
||||
|
||||
with self.assertRaises(RuntimeError):
|
||||
CANParser("toyota_nodsu_pt_generated", [("ACC_CONTROL", 10), ("ACC_CONTROL", 10)])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
55
opendbc/can/tests/test_parser_performance.py
Normal file
55
opendbc/can/tests/test_parser_performance.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env python3
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from opendbc.can.parser import CANParser
|
||||
from opendbc.can.packer import CANPacker
|
||||
from opendbc.can.tests.test_packer_parser import can_list_to_can_capnp
|
||||
|
||||
|
||||
@unittest.skip("TODO: varies too much between machines")
|
||||
class TestParser(unittest.TestCase):
|
||||
def _benchmark(self, checks, thresholds, n):
|
||||
parser = CANParser('toyota_new_mc_pt_generated', checks, 0)
|
||||
packer = CANPacker('toyota_new_mc_pt_generated')
|
||||
|
||||
can_msgs = []
|
||||
for i in range(50000):
|
||||
values = {"ACC_CONTROL": {"ACC_TYPE": 1, "ALLOW_LONG_PRESS": 3}}
|
||||
msgs = [packer.make_can_msg(k, 0, v) for k, v in values.items()]
|
||||
bts = can_list_to_can_capnp(msgs, logMonoTime=int(0.01 * i * 1e9))
|
||||
can_msgs.append(bts)
|
||||
|
||||
ets = []
|
||||
for _ in range(25):
|
||||
if n > 1:
|
||||
strings = []
|
||||
for i in range(0, len(can_msgs), n):
|
||||
strings.append(can_msgs[i:i + n])
|
||||
t1 = time.process_time_ns()
|
||||
for m in strings:
|
||||
parser.update_strings(m)
|
||||
t2 = time.process_time_ns()
|
||||
else:
|
||||
t1 = time.process_time_ns()
|
||||
for m in can_msgs:
|
||||
parser.update_strings([m])
|
||||
t2 = time.process_time_ns()
|
||||
|
||||
ets.append(t2 - t1)
|
||||
|
||||
et = sum(ets) / len(ets)
|
||||
avg_nanos = et / len(can_msgs)
|
||||
print('%s: [%d] %.1fms to parse %s, avg: %dns' % (self._testMethodName, n, et/1e6, len(can_msgs), avg_nanos))
|
||||
|
||||
minn, maxx = thresholds
|
||||
self.assertLess(avg_nanos, maxx)
|
||||
self.assertGreater(avg_nanos, minn, "Performance seems to have improved, update test thresholds.")
|
||||
|
||||
def test_performance_all_signals(self):
|
||||
self._benchmark([('ACC_CONTROL', 10)], (10000, 19000), 1)
|
||||
self._benchmark([('ACC_CONTROL', 10)], (1300, 5000), 10)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user