Add openpilot tests

This commit is contained in:
FrogAi
2024-03-06 14:58:47 -07:00
parent 2901597132
commit b39097a12d
259 changed files with 31176 additions and 12 deletions

View File

@@ -0,0 +1,103 @@
import os
import time
import pytest
from panda import Panda, PandaDFU, McuType, BASEDIR
def check_signature(p):
assert not p.bootstub, "Flashed firmware not booting. Stuck in bootstub."
assert p.up_to_date()
def test_dfu(p):
app_mcu_type = p.get_mcu_type()
dfu_serial = p.get_dfu_serial()
p.reset(enter_bootstub=True)
p.reset(enter_bootloader=True)
assert Panda.wait_for_dfu(dfu_serial, timeout=19), "failed to enter DFU"
dfu = PandaDFU(dfu_serial)
assert dfu.get_mcu_type() == app_mcu_type
assert dfu_serial in PandaDFU.list()
dfu._handle.clear_status()
dfu.reset()
p.reconnect()
# TODO: make more comprehensive bootstub tests and run on a few production ones + current
# TODO: also test release-signed app
@pytest.mark.timeout(30)
def test_known_bootstub(p):
"""
Test that compiled app can work with known production bootstub
"""
known_bootstubs = {
# covers the two cases listed in Panda.connect
McuType.F4: [
# case A - no bcdDevice or panda type, has to assume F4
"bootstub_f4_first_dos_production.panda.bin",
# case B - just bcdDevice
"bootstub_f4_only_bcd.panda.bin",
],
McuType.H7: ["bootstub.panda_h7.bin"],
}
for kb in known_bootstubs[p.get_mcu_type()]:
app_ids = (p.get_mcu_type(), p.get_usb_serial())
assert None not in app_ids
p.reset(enter_bootstub=True)
p.reset(enter_bootloader=True)
dfu_serial = p.get_dfu_serial()
assert Panda.wait_for_dfu(dfu_serial, timeout=30)
dfu = PandaDFU(dfu_serial)
with open(os.path.join(BASEDIR, "tests/hitl/known_bootstub", kb), "rb") as f:
code = f.read()
dfu.program_bootstub(code)
dfu.reset()
p.connect(claim=False, wait=True)
# check for MCU or serial mismatch
with Panda(p._serial, claim=False) as np:
bootstub_ids = (np.get_mcu_type(), np.get_usb_serial())
assert app_ids == bootstub_ids
# ensure we can flash app and it jumps to app
p.flash()
check_signature(p)
assert not p.bootstub
@pytest.mark.timeout(25)
def test_recover(p):
assert p.recover(timeout=30)
check_signature(p)
@pytest.mark.timeout(25)
def test_flash(p):
# test flash from bootstub
serial = p._serial
assert serial is not None
p.reset(enter_bootstub=True)
p.close()
time.sleep(2)
with Panda(serial) as np:
assert np.bootstub
assert np._serial == serial
np.flash()
p.reconnect()
p.reset()
check_signature(p)
# test flash from app
p.flash()
check_signature(p)

View File

@@ -0,0 +1,125 @@
import time
import pytest
from panda import Panda
from panda import PandaJungle
from panda.tests.hitl.conftest import PandaGroup
def test_ignition(p, panda_jungle):
# Set harness orientation to #2, since the ignition line is on the wrong SBU bus :/
panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_2)
p.reset()
for ign in (True, False):
panda_jungle.set_ignition(ign)
time.sleep(0.1)
assert p.health()['ignition_line'] == ign
@pytest.mark.test_panda_types(PandaGroup.GEN2)
def test_harness_status(p, panda_jungle):
flipped = None
for ignition in [True, False]:
for orientation in [Panda.HARNESS_STATUS_NC, Panda.HARNESS_STATUS_NORMAL, Panda.HARNESS_STATUS_FLIPPED]:
panda_jungle.set_harness_orientation(orientation)
panda_jungle.set_ignition(ignition)
time.sleep(1)
health = p.health()
detected_orientation = health['car_harness_status']
print(f"set: {orientation} detected: {detected_orientation}")
# Orientation
if orientation == Panda.HARNESS_STATUS_NC:
assert detected_orientation == Panda.HARNESS_STATUS_NC
else:
if flipped is None:
flipped = (detected_orientation != orientation)
if orientation == Panda.HARNESS_STATUS_NORMAL:
assert detected_orientation == (Panda.HARNESS_STATUS_FLIPPED if flipped else Panda.HARNESS_STATUS_NORMAL)
else:
assert detected_orientation == (Panda.HARNESS_STATUS_NORMAL if flipped else Panda.HARNESS_STATUS_FLIPPED)
# Line ignition
assert health['ignition_line'] == (False if orientation == Panda.HARNESS_STATUS_NC else ignition)
# SBU voltages
supply_voltage_mV = 1800 if p.get_type() in [Panda.HW_TYPE_TRES, ] else 3300
if orientation == Panda.HARNESS_STATUS_NC:
assert health['sbu1_voltage_mV'] > 0.9 * supply_voltage_mV
assert health['sbu2_voltage_mV'] > 0.9 * supply_voltage_mV
else:
relay_line = 'sbu1_voltage_mV' if (detected_orientation == Panda.HARNESS_STATUS_FLIPPED) else 'sbu2_voltage_mV'
ignition_line = 'sbu2_voltage_mV' if (detected_orientation == Panda.HARNESS_STATUS_FLIPPED) else 'sbu1_voltage_mV'
assert health[relay_line] < 0.1 * supply_voltage_mV
assert health[ignition_line] > health[relay_line]
if ignition:
assert health[ignition_line] < 0.3 * supply_voltage_mV
else:
assert health[ignition_line] > 0.9 * supply_voltage_mV
@pytest.mark.skip_panda_types((Panda.HW_TYPE_DOS, ))
def test_voltage(p):
for _ in range(10):
voltage = p.health()['voltage']
assert ((voltage > 11000) and (voltage < 13000))
time.sleep(0.1)
def test_hw_type(p):
"""
hw type should be same in bootstub as application
"""
hw_type = p.get_type()
mcu_type = p.get_mcu_type()
assert mcu_type is not None
app_uid = p.get_uid()
usb_serial = p.get_usb_serial()
assert app_uid == usb_serial
p.reset(enter_bootstub=True, reconnect=True)
p.close()
time.sleep(3)
with Panda(p.get_usb_serial()) as pp:
assert pp.bootstub
assert pp.get_type() == hw_type, "Bootstub and app hw type mismatch"
assert pp.get_mcu_type() == mcu_type, "Bootstub and app MCU type mismatch"
assert pp.get_uid() == app_uid
def test_heartbeat(p, panda_jungle):
panda_jungle.set_ignition(True)
# TODO: add more cases here once the tests aren't super slow
p.set_safety_mode(mode=Panda.SAFETY_HYUNDAI, param=Panda.FLAG_HYUNDAI_LONG)
p.send_heartbeat()
assert p.health()['safety_mode'] == Panda.SAFETY_HYUNDAI
assert p.health()['safety_param'] == Panda.FLAG_HYUNDAI_LONG
# shouldn't do anything once we're in a car safety mode
p.set_heartbeat_disabled()
time.sleep(6.)
h = p.health()
assert h['heartbeat_lost']
assert h['safety_mode'] == Panda.SAFETY_SILENT
assert h['safety_param'] == 0
assert h['controls_allowed'] == 0
def test_microsecond_timer(p):
start_time = p.get_microsecond_timer()
time.sleep(1)
end_time = p.get_microsecond_timer()
# account for uint32 overflow
if end_time < start_time:
end_time += 2**32
time_diff = (end_time - start_time) / 1e6
assert 0.98 < time_diff < 1.02, f"Timer not running at the correct speed! (got {time_diff:.2f}s instead of 1.0s)"

View File

@@ -0,0 +1,127 @@
import time
import pytest
from flaky import flaky
from panda import Panda
from panda.tests.hitl.conftest import SPEED_NORMAL, SPEED_GMLAN, PandaGroup
from panda.tests.hitl.helpers import time_many_sends
def test_can_loopback(p):
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
for bus in (0, 1, 2):
# set bus 0 speed to 5000
p.set_can_speed_kbps(bus, 500)
# send a message on bus 0
p.can_send(0x1aa, b"message", bus)
# confirm receive both on loopback and send receipt
time.sleep(0.05)
r = p.can_recv()
sr = [x for x in r if x[3] == 0x80 | bus]
lb = [x for x in r if x[3] == bus]
assert len(sr) == 1
assert len(lb) == 1
# confirm data is correct
assert 0x1aa == sr[0][0] == lb[0][0]
assert b"message" == sr[0][2] == lb[0][2]
def test_reliability(p):
MSG_COUNT = 100
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
p.set_can_speed_kbps(0, 1000)
addrs = list(range(100, 100 + MSG_COUNT))
ts = [(j, 0, b"\xaa" * 8, 0) for j in addrs]
for _ in range(100):
st = time.monotonic()
p.can_send_many(ts)
r = []
while len(r) < 200 and (time.monotonic() - st) < 0.5:
r.extend(p.can_recv())
sent_echo = [x for x in r if x[3] == 0x80]
loopback_resp = [x for x in r if x[3] == 0]
assert sorted([x[0] for x in loopback_resp]) == addrs
assert sorted([x[0] for x in sent_echo]) == addrs
assert len(r) == 200
# take sub 20ms
et = (time.monotonic() - st) * 1000.0
assert et < 20
@flaky(max_runs=6, min_passes=1)
def test_throughput(p):
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# enable CAN loopback mode
p.set_can_loopback(True)
for speed in [10, 20, 50, 100, 125, 250, 500, 1000]:
# set bus 0 speed to speed
p.set_can_speed_kbps(0, speed)
time.sleep(0.05)
comp_kbps = time_many_sends(p, 0)
# bit count from https://en.wikipedia.org/wiki/CAN_bus
saturation_pct = (comp_kbps / speed) * 100.0
assert saturation_pct > 80
assert saturation_pct < 100
print("loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct))
@pytest.mark.test_panda_types(PandaGroup.GMLAN)
def test_gmlan(p):
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
# set gmlan on CAN2
for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3, Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
p.set_gmlan(bus)
comp_kbps_gmlan = time_many_sends(p, 3)
assert comp_kbps_gmlan > (0.8 * SPEED_GMLAN)
assert comp_kbps_gmlan < (1.0 * SPEED_GMLAN)
p.set_gmlan(None)
comp_kbps_normal = time_many_sends(p, bus)
assert comp_kbps_normal > (0.8 * SPEED_NORMAL)
assert comp_kbps_normal < (1.0 * SPEED_NORMAL)
print("%d: %.2f kbps vs %.2f kbps" % (bus, comp_kbps_gmlan, comp_kbps_normal))
@pytest.mark.test_panda_types(PandaGroup.GMLAN)
def test_gmlan_bad_toggle(p):
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
# GMLAN_CAN2
for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
p.set_gmlan(bus)
comp_kbps_gmlan = time_many_sends(p, 3)
assert comp_kbps_gmlan > (0.6 * SPEED_GMLAN)
assert comp_kbps_gmlan < (1.0 * SPEED_GMLAN)
# normal
for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
p.set_gmlan(None)
comp_kbps_normal = time_many_sends(p, bus)
assert comp_kbps_normal > (0.6 * SPEED_NORMAL)
assert comp_kbps_normal < (1.0 * SPEED_NORMAL)
# this will fail if you have hardware serial connected
def test_serial_debug(p):
_ = p.serial_read(Panda.SERIAL_DEBUG) # junk
p.call_control_api(0x01)
assert p.serial_read(Panda.SERIAL_DEBUG).startswith(b"NO HANDLER")

View File

@@ -0,0 +1,202 @@
import os
import time
import pytest
import random
import threading
from flaky import flaky
from collections import defaultdict
from panda import Panda
from panda.tests.hitl.conftest import PandaGroup
from panda.tests.hitl.helpers import time_many_sends, get_random_can_messages, clear_can_buffers
@flaky(max_runs=3, min_passes=1)
@pytest.mark.timeout(35)
def test_send_recv(p, panda_jungle):
def test(p_send, p_recv):
for bus in (0, 1, 2):
for speed in (10, 20, 50, 100, 125, 250, 500, 1000):
clear_can_buffers(p_send, speed)
clear_can_buffers(p_recv, speed)
comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True)
saturation_pct = (comp_kbps / speed) * 100.0
assert 80 < saturation_pct < 100
print(f"two pandas bus {bus}, 100 messages at speed {speed:4d}, comp speed is {comp_kbps:7.2f}, {saturation_pct:6.2f}%")
# Run tests in both directions
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
test(p, panda_jungle)
test(panda_jungle, p)
@flaky(max_runs=6, min_passes=1)
@pytest.mark.timeout(30)
def test_latency(p, panda_jungle):
def test(p_send, p_recv):
for bus in (0, 1, 2):
for speed in (10, 20, 50, 100, 125, 250, 500, 1000):
clear_can_buffers(p_send, speed)
clear_can_buffers(p_recv, speed)
latencies = []
comp_kbps_list = []
saturation_pcts = []
num_messages = 100
for _ in range(num_messages):
st = time.monotonic()
p_send.can_send(0x1ab, b"message", bus)
r = []
while len(r) < 1 and (time.monotonic() - st) < 5:
r = p_recv.can_recv()
et = time.monotonic()
r_echo = []
while len(r_echo) < 1 and (time.monotonic() - st) < 10:
r_echo = p_send.can_recv()
if len(r) == 0 or len(r_echo) == 0:
print(f"r: {r}, r_echo: {r_echo}")
assert len(r) == 1
assert len(r_echo) == 1
et = (et - st) * 1000.0
comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / et
latency = et - ((1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / speed)
assert latency < 5.0
saturation_pct = (comp_kbps / speed) * 100.0
latencies.append(latency)
comp_kbps_list.append(comp_kbps)
saturation_pcts.append(saturation_pct)
average_latency = sum(latencies) / num_messages
assert average_latency < 1.0
average_comp_kbps = sum(comp_kbps_list) / num_messages
average_saturation_pct = sum(saturation_pcts) / num_messages
print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"
.format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct))
# Run tests in both directions
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
test(p, panda_jungle)
test(panda_jungle, p)
@pytest.mark.panda_expect_can_error
@pytest.mark.test_panda_types(PandaGroup.GEN2)
def test_gen2_loopback(p, panda_jungle):
def test(p_send, p_recv, address=None):
for bus in range(4):
obd = False
if bus == 3:
obd = True
bus = 1
# Clear buses
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
# Send a random string
addr = address if address else random.randint(1, 2000)
string = b"test" + os.urandom(4)
p_send.set_obd(obd)
p_recv.set_obd(obd)
time.sleep(0.2)
p_send.can_send(addr, string, bus)
time.sleep(0.2)
content = p_recv.can_recv()
# Check amount of messages
assert len(content) == 1
# Check content
assert content[0][0] == addr and content[0][2] == string
# Check bus
assert content[0][3] == bus
print("Bus:", bus, "address:", addr, "OBD:", obd, "OK")
# Run tests in both directions
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
test(p, panda_jungle)
test(panda_jungle, p)
# Test extended frame address with ELM327 mode
p.set_safety_mode(Panda.SAFETY_ELM327)
test(p, panda_jungle, 0x18DB33F1)
test(panda_jungle, p, 0x18DB33F1)
# TODO: why it's not being reset by fixtures reinit?
p.set_obd(False)
panda_jungle.set_obd(False)
def test_bulk_write(p, panda_jungle):
# The TX buffers on pandas is 0x100 in length.
NUM_MESSAGES_PER_BUS = 10000
def flood_tx(panda):
print('Sending!')
msg = b"\xaa" * 8
packet = []
# start with many messages on a single bus (higher contention for single TX ring buffer)
packet += [[0xaa, None, msg, 0]] * NUM_MESSAGES_PER_BUS
# end with many messages on multiple buses
packet += [[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] * NUM_MESSAGES_PER_BUS
# Disable timeout
panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
panda.can_send_many(packet, timeout=0)
print(f"Done sending {4 * NUM_MESSAGES_PER_BUS} messages!", time.monotonic())
print(panda.health())
# Start transmisson
threading.Thread(target=flood_tx, args=(p,)).start()
# Receive as much as we can in a few second time period
rx = []
old_len = 0
start_time = time.monotonic()
while time.monotonic() - start_time < 5 or len(rx) > old_len:
old_len = len(rx)
rx.extend(panda_jungle.can_recv())
print(f"Received {len(rx)} messages", time.monotonic())
# All messages should have been received
if len(rx) != 4 * NUM_MESSAGES_PER_BUS:
raise Exception("Did not receive all messages!")
def test_message_integrity(p):
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
for i in range(250):
sent_msgs = defaultdict(set)
for _ in range(random.randrange(10)):
to_send = get_random_can_messages(random.randrange(100))
for m in to_send:
sent_msgs[m[3]].add((m[0], m[2]))
p.can_send_many(to_send, timeout=0)
start_time = time.monotonic()
while time.monotonic() - start_time < 2 and any(len(sent_msgs[bus]) for bus in range(3)):
recvd = p.can_recv()
for msg in recvd:
if msg[3] >= 128:
k = (msg[0], bytes(msg[2]))
bus = msg[3]-128
assert k in sent_msgs[bus], f"message {k} was never sent on bus {bus}"
sent_msgs[msg[3]-128].discard(k)
# if a set isn't empty, messages got dropped
for bus in range(3):
assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages"
print("Got all messages intact")

103
panda/tests/hitl/5_spi.py Normal file
View File

@@ -0,0 +1,103 @@
import binascii
import pytest
import random
from unittest.mock import patch
from panda import Panda, PandaDFU
from panda.python.spi import SpiDevice, PandaProtocolMismatch, PandaSpiNackResponse
pytestmark = [
pytest.mark.test_panda_types((Panda.HW_TYPE_TRES, ))
]
@pytest.mark.skip("doesn't work, bootloader seems to ignore commands once it sees junk")
def test_dfu_with_spam(p):
dfu_serial = p.get_dfu_serial()
# enter DFU
p.reset(enter_bootstub=True)
p.reset(enter_bootloader=True)
assert Panda.wait_for_dfu(dfu_serial, timeout=19), "failed to enter DFU"
# send junk
d = SpiDevice()
for _ in range(9):
with d.acquire() as spi:
dat = [random.randint(-1, 255) for _ in range(random.randint(1, 100))]
spi.xfer(dat)
# should still show up
assert dfu_serial in PandaDFU.list()
class TestSpi:
def _ping(self, mocker, panda):
# should work with no retries
spy = mocker.spy(panda._handle, '_wait_for_ack')
panda.health()
assert spy.call_count == 2
mocker.stop(spy)
def test_protocol_version_check(self, p):
for bootstub in (False, True):
p.reset(enter_bootstub=bootstub)
with patch('panda.python.spi.PandaSpiHandle.PROTOCOL_VERSION', return_value="abc"):
# list should still work with wrong version
assert p._serial in Panda.list()
# connect but raise protocol error
with pytest.raises(PandaProtocolMismatch):
Panda(p._serial)
def test_protocol_version_data(self, p):
for bootstub in (False, True):
p.reset(enter_bootstub=bootstub)
v = p._handle.get_protocol_version()
uid = binascii.hexlify(v[:12]).decode()
assert uid == p.get_uid()
hwtype = v[12]
assert hwtype == ord(p.get_type())
bstub = v[13]
assert bstub == (0xEE if bootstub else 0xCC)
def test_all_comm_types(self, mocker, p):
spy = mocker.spy(p._handle, '_wait_for_ack')
# controlRead + controlWrite
p.health()
p.can_clear(0)
assert spy.call_count == 2*2
# bulkRead + bulkWrite
p.can_recv()
p.can_send(0x123, b"somedata", 0)
assert spy.call_count == 2*4
def test_bad_header(self, mocker, p):
with patch('panda.python.spi.SYNC', return_value=0):
with pytest.raises(PandaSpiNackResponse):
p._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, p.HEALTH_STRUCT.size, timeout=50)
self._ping(mocker, p)
def test_bad_checksum(self, mocker, p):
cnt = p.health()['spi_checksum_error_count']
with patch('panda.python.spi.PandaSpiHandle._calc_checksum', return_value=0):
with pytest.raises(PandaSpiNackResponse):
p._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, p.HEALTH_STRUCT.size, timeout=50)
self._ping(mocker, p)
assert (p.health()['spi_checksum_error_count'] - cnt) > 0
def test_non_existent_endpoint(self, mocker, p):
for _ in range(10):
ep = random.randint(4, 20)
with pytest.raises(PandaSpiNackResponse):
p._handle.bulkRead(ep, random.randint(1, 1000), timeout=50)
self._ping(mocker, p)
with pytest.raises(PandaSpiNackResponse):
p._handle.bulkWrite(ep, b"abc", timeout=50)
self._ping(mocker, p)

View File

@@ -0,0 +1,29 @@
import time
from panda import Panda
def test_safety_nooutput(p):
p.set_safety_mode(Panda.SAFETY_SILENT)
p.set_can_loopback(True)
# send a message on bus 0
p.can_send(0x1aa, b"message", 0)
# confirm receive nothing
time.sleep(0.05)
r = p.can_recv()
# bus 192 is messages blocked by TX safety hook on bus 0
assert len([x for x in r if x[3] != 192]) == 0
assert len([x for x in r if x[3] == 192]) == 1
def test_canfd_safety_modes(p):
# works on all pandas
p.set_safety_mode(Panda.SAFETY_TOYOTA)
assert p.health()['safety_mode'] == Panda.SAFETY_TOYOTA
# shouldn't be able to set a CAN-FD safety mode on non CAN-FD panda
p.set_safety_mode(Panda.SAFETY_HYUNDAI_CANFD)
expected_mode = Panda.SAFETY_HYUNDAI_CANFD if p.get_type() in Panda.H7_DEVICES else Panda.SAFETY_SILENT
assert p.health()['safety_mode'] == expected_mode

View File

@@ -0,0 +1,68 @@
import time
import pytest
from panda import Panda
pytestmark = [
pytest.mark.skip_panda_types(Panda.HW_TYPE_UNO),
pytest.mark.test_panda_types(Panda.INTERNAL_DEVICES)
]
@pytest.mark.timeout(2*60)
def test_fan_controller(p):
start_health = p.health()
for power in (30, 50, 80, 100):
p.set_fan_power(0)
while p.get_fan_rpm() > 0:
time.sleep(0.1)
# wait until fan spins up (and recovers if needed),
# then wait a bit more for the RPM to converge
p.set_fan_power(power)
for _ in range(20):
time.sleep(1)
if p.get_fan_rpm() > 1000:
break
time.sleep(5)
expected_rpm = Panda.MAX_FAN_RPMs[bytes(p.get_type())] * power / 100
assert 0.9 * expected_rpm <= p.get_fan_rpm() <= 1.1 * expected_rpm
# Ensure the stall detection is tested on dos
if p.get_type() == Panda.HW_TYPE_DOS:
stalls = p.health()['fan_stall_count'] - start_health['fan_stall_count']
assert stalls >= 2
print("stall count", stalls)
else:
assert p.health()['fan_stall_count'] == 0
def test_fan_cooldown(p):
# if the fan cooldown doesn't work, we get high frequency noise on the tach line
# while the rotor spins down. this makes sure it never goes beyond the expected max RPM
p.set_fan_power(100)
time.sleep(3)
p.set_fan_power(0)
for _ in range(5):
assert p.get_fan_rpm() <= 7000
time.sleep(0.5)
def test_fan_overshoot(p):
if p.get_type() == Panda.HW_TYPE_DOS:
pytest.skip("panda's fan controller overshoots on the comma three fans that need stall recovery")
# make sure it's stopped completely
p.set_fan_power(0)
while p.get_fan_rpm() > 0:
time.sleep(0.1)
# set it to 30% power to mimic going onroad
p.set_fan_power(30)
max_rpm = 0
for _ in range(50):
max_rpm = max(max_rpm, p.get_fan_rpm())
time.sleep(0.1)
# tolerate 10% overshoot
expected_rpm = Panda.MAX_FAN_RPMs[bytes(p.get_type())] * 30 / 100
assert max_rpm <= 1.1 * expected_rpm, f"Fan overshoot: {(max_rpm / expected_rpm * 100) - 100:.1f}%"

View File

View File

@@ -0,0 +1,225 @@
import os
import pytest
import concurrent.futures
from panda import Panda, PandaDFU, PandaJungle
from panda.tests.hitl.helpers import clear_can_buffers
# needed to get output when using xdist
if "DEBUG" in os.environ:
import sys
sys.stdout = sys.stderr
SPEED_NORMAL = 500
SPEED_GMLAN = 33.3
BUS_SPEEDS = [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)]
JUNGLE_SERIAL = os.getenv("PANDAS_JUNGLE")
NO_JUNGLE = os.environ.get("NO_JUNGLE", "0") == "1"
PANDAS_EXCLUDE = os.getenv("PANDAS_EXCLUDE", "").strip().split(" ")
HW_TYPES = os.environ.get("HW_TYPES", None)
PARALLEL = "PARALLEL" in os.environ
NON_PARALLEL = "NON_PARALLEL" in os.environ
if PARALLEL:
NO_JUNGLE = True
class PandaGroup:
H7 = (Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2, Panda.HW_TYPE_TRES)
GEN2 = (Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO, Panda.HW_TYPE_DOS) + H7
GMLAN = (Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_GREY_PANDA)
TESTED = (Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2, Panda.HW_TYPE_UNO)
if HW_TYPES is not None:
PandaGroup.TESTED = [bytes([int(x), ]) for x in HW_TYPES.strip().split(",")] # type: ignore
# Find all pandas connected
_all_pandas = {}
_panda_jungle = None
def init_all_pandas():
if not NO_JUNGLE:
global _panda_jungle
_panda_jungle = PandaJungle(JUNGLE_SERIAL)
_panda_jungle.set_panda_power(True)
for serial in Panda.list():
if serial not in PANDAS_EXCLUDE:
with Panda(serial=serial, claim=False) as p:
ptype = bytes(p.get_type())
if ptype in PandaGroup.TESTED:
_all_pandas[serial] = ptype
# ensure we have all tested panda types
missing_types = set(PandaGroup.TESTED) - set(_all_pandas.values())
assert len(missing_types) == 0, f"Missing panda types: {missing_types}"
print(f"{len(_all_pandas)} total pandas")
init_all_pandas()
_all_panda_serials = sorted(_all_pandas.keys())
def init_jungle():
if _panda_jungle is None:
return
clear_can_buffers(_panda_jungle)
_panda_jungle.set_panda_power(True)
_panda_jungle.set_can_loopback(False)
_panda_jungle.set_obd(False)
_panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_1)
for bus, speed in BUS_SPEEDS:
_panda_jungle.set_can_speed_kbps(bus, speed)
# ensure FW hasn't changed
assert _panda_jungle.up_to_date()
def pytest_configure(config):
config.addinivalue_line(
"markers", "test_panda_types(name): whitelist a test for specific panda types"
)
config.addinivalue_line(
"markers", "skip_panda_types(name): blacklist panda types from a test"
)
config.addinivalue_line(
"markers", "panda_expect_can_error: mark test to ignore CAN health errors"
)
@pytest.hookimpl(tryfirst=True)
def pytest_collection_modifyitems(items):
for item in items:
if item.get_closest_marker('timeout') is None:
item.add_marker(pytest.mark.timeout(60))
# xdist grouping by panda
serial = item.name.split("serial=")[1].split(",")[0]
assert len(serial) == 24
item.add_marker(pytest.mark.xdist_group(serial))
needs_jungle = "panda_jungle" in item.fixturenames
if PARALLEL and needs_jungle:
item.add_marker(pytest.mark.skip(reason="no jungle tests in PARALLEL mode"))
elif NON_PARALLEL and not needs_jungle:
item.add_marker(pytest.mark.skip(reason="only running jungle tests"))
def pytest_make_parametrize_id(config, val, argname):
if val in _all_pandas:
# TODO: get nice string instead of int
hw_type = _all_pandas[val][0]
return f"serial={val}, hw_type={hw_type}"
return None
@pytest.fixture(name='panda_jungle', scope='function')
def fixture_panda_jungle(request):
init_jungle()
return _panda_jungle
@pytest.fixture(name='p', scope='function')
def func_fixture_panda(request, module_panda):
p = module_panda
# Check if test is applicable to this panda
mark = request.node.get_closest_marker('test_panda_types')
if mark:
assert len(mark.args) > 0, "Missing panda types argument in mark"
test_types = mark.args[0]
if _all_pandas[p.get_usb_serial()] not in test_types:
pytest.skip(f"Not applicable, {test_types} pandas only")
mark = request.node.get_closest_marker('skip_panda_types')
if mark:
assert len(mark.args) > 0, "Missing panda types argument in mark"
skip_types = mark.args[0]
if _all_pandas[p.get_usb_serial()] in skip_types:
pytest.skip(f"Not applicable to {skip_types}")
# TODO: reset is slow (2+ seconds)
p.reset()
# ensure FW hasn't changed
assert p.up_to_date()
# Run test
yield p
# Teardown
# reconnect
if p.get_dfu_serial() in PandaDFU.list():
PandaDFU(p.get_dfu_serial()).reset()
p.reconnect()
if not p.connected:
p.reconnect()
if p.bootstub:
p.reset()
assert not p.bootstub
# TODO: would be nice to make these common checks in the teardown
# show up as failed tests instead of "errors"
# Check for faults
assert p.health()['faults'] == 0
assert p.health()['fault_status'] == 0
# Check for SPI errors
#assert p.health()['spi_checksum_error_count'] == 0
# Check health of each CAN core after test, normal to fail for test_gen2_loopback on OBD bus, so skipping
mark = request.node.get_closest_marker('panda_expect_can_error')
expect_can_error = mark is not None
if not expect_can_error:
for i in range(3):
can_health = p.can_health(i)
assert can_health['bus_off_cnt'] == 0
assert can_health['receive_error_cnt'] < 127
assert can_health['transmit_error_cnt'] < 255
assert can_health['error_passive'] == 0
assert can_health['error_warning'] == 0
assert can_health['total_rx_lost_cnt'] == 0
assert can_health['total_tx_lost_cnt'] == 0
assert can_health['total_error_cnt'] == 0
assert can_health['total_tx_checksum_error_cnt'] == 0
@pytest.fixture(name='module_panda', params=_all_panda_serials, scope='module')
def fixture_panda_setup(request):
"""
Clean up all pandas + jungle and return the panda under test.
"""
panda_serial = request.param
# Initialize jungle
init_jungle()
# Connect to pandas
def cnnct(s):
if s == panda_serial:
p = Panda(serial=s)
p.reset(reconnect=True)
p.set_can_loopback(False)
p.set_gmlan(None)
p.set_power_save(False)
for bus, speed in BUS_SPEEDS:
p.set_can_speed_kbps(bus, speed)
clear_can_buffers(p)
p.set_power_save(False)
return p
elif not PARALLEL:
with Panda(serial=s) as p:
p.reset(reconnect=False)
return None
with concurrent.futures.ThreadPoolExecutor() as exc:
ps = list(exc.map(cnnct, _all_panda_serials, timeout=20))
pandas = [p for p in ps if p is not None]
# run test
yield pandas[0]
# Teardown
for p in pandas:
p.close()

View File

@@ -0,0 +1,71 @@
import time
import random
def get_random_can_messages(n):
m = []
for _ in range(n):
bus = random.randrange(3)
addr = random.randrange(1 << 29)
dat = bytes([random.getrandbits(8) for _ in range(random.randrange(1, 9))])
m.append([addr, None, dat, bus])
return m
def time_many_sends(p, bus, p_recv=None, msg_count=100, two_pandas=False, msg_len=8):
if p_recv is None:
p_recv = p
if p == p_recv and two_pandas:
raise ValueError("Cannot have two pandas that are the same panda")
msg_id = random.randint(0x100, 0x200)
to_send = [(msg_id, 0, b"\xaa" * msg_len, bus)] * msg_count
start_time = time.monotonic()
p.can_send_many(to_send)
r = []
r_echo = []
r_len_expected = msg_count if two_pandas else msg_count * 2
r_echo_len_exected = msg_count if two_pandas else 0
while len(r) < r_len_expected and (time.monotonic() - start_time) < 5:
r.extend(p_recv.can_recv())
end_time = time.monotonic()
if two_pandas:
while len(r_echo) < r_echo_len_exected and (time.monotonic() - start_time) < 10:
r_echo.extend(p.can_recv())
sent_echo = [x for x in r if x[3] == 0x80 | bus and x[0] == msg_id]
sent_echo.extend([x for x in r_echo if x[3] == 0x80 | bus and x[0] == msg_id])
resp = [x for x in r if x[3] == bus and x[0] == msg_id]
leftovers = [x for x in r if (x[3] != 0x80 | bus and x[3] != bus) or x[0] != msg_id]
assert len(leftovers) == 0
assert len(resp) == msg_count
assert len(sent_echo) == msg_count
end_time = (end_time - start_time) * 1000.0
comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + (msg_len * 8) + 15 + 1 + 1 + 1 + 7) * msg_count / end_time
return comp_kbps
def clear_can_buffers(panda, speed: int | None = None):
if speed is not None:
for bus in range(3):
panda.set_can_speed_kbps(bus, speed)
# clear tx buffers
for i in range(4):
panda.can_clear(i)
# clear rx buffers
panda.can_clear(0xFFFF)
r = [1]
st = time.monotonic()
while len(r) > 0:
r = panda.can_recv()
time.sleep(0.05)
if (time.monotonic() - st) > 10:
raise Exception("Unable to clear can buffers for panda ", panda.get_serial())

Binary file not shown.

View File

@@ -0,0 +1,42 @@
#!/usr/bin/env python3
import concurrent.futures
from panda import PandaJungle, PandaJungleDFU, McuType
from panda.tests.libs.resetter import Resetter
SERIALS = {'180019001451313236343430', '1d0017000c50435635333720'}
def recover(s):
with PandaJungleDFU(s) as pd:
pd.recover()
def flash(s):
with PandaJungle(s) as p:
p.flash()
return p.get_mcu_type()
# Reset + flash all CI hardware to get it into a consistent state
# * port 1: jungles-under-test
# * port 2: USB hubs
# * port 3: HITL pandas and their jungles
if __name__ == "__main__":
with Resetter() as r:
# everything off
for i in range(1, 4):
r.enable_power(i, 0)
r.cycle_power(ports=[1, 2], dfu=True)
dfu_serials = PandaJungleDFU.list()
print(len(dfu_serials), len(SERIALS))
assert len(dfu_serials) == len(SERIALS)
with concurrent.futures.ProcessPoolExecutor(max_workers=len(dfu_serials)) as exc:
list(exc.map(recover, dfu_serials, timeout=30))
# power cycle for H7 bootloader bug
r.cycle_power(ports=[1, 2])
serials = PandaJungle.list()
assert set(PandaJungle.list()) >= SERIALS
mcu_types = list(exc.map(flash, SERIALS, timeout=20))
assert set(mcu_types) == {McuType.F4, McuType.H7}

View File

@@ -0,0 +1,8 @@
#!/bin/bash
set -e
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)"
cd $DIR
# n = number of pandas tested
PARALLEL=1 pytest --durations=0 *.py -n 5 --dist loadgroup -x

View File

@@ -0,0 +1,7 @@
#!/bin/bash
set -e
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)"
cd $DIR
NON_PARALLEL=1 pytest --durations=0 *.py -x