This commit is contained in:
Your Name
2024-04-27 03:24:56 -05:00
parent 680e909d73
commit 4401b71958
21 changed files with 1983 additions and 6932 deletions

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,135 @@
#define CATCH_CONFIG_MAIN
#define CATCH_CONFIG_ENABLE_BENCHMARKING
#include "catch2/catch.hpp"
#include "cereal/messaging/messaging.h"
#include "common/util.h"
#include "selfdrive/boardd/panda.h"
struct PandaTest : public Panda {
PandaTest(uint32_t bus_offset, int can_list_size, cereal::PandaState::PandaType hw_type);
void test_can_send();
void test_can_recv(uint32_t chunk_size = 0);
void test_chunked_can_recv();
std::map<int, std::string> test_data;
int can_list_size = 0;
int total_pakets_size = 0;
MessageBuilder msg;
capnp::List<cereal::CanData>::Reader can_data_list;
};
PandaTest::PandaTest(uint32_t bus_offset_, int can_list_size, cereal::PandaState::PandaType hw_type) : can_list_size(can_list_size), Panda(bus_offset_) {
this->hw_type = hw_type;
int data_limit = ((hw_type == cereal::PandaState::PandaType::RED_PANDA) ? std::size(dlc_to_len) : 8);
// prepare test data
for (int i = 0; i < data_limit; ++i) {
std::random_device rd;
std::independent_bits_engine<std::default_random_engine, CHAR_BIT, unsigned char> rbe(rd());
int data_len = dlc_to_len[i];
std::string bytes(data_len, '\0');
std::generate(bytes.begin(), bytes.end(), std::ref(rbe));
test_data[data_len] = bytes;
}
// generate can messages for this panda
auto can_list = msg.initEvent().initSendcan(can_list_size);
for (uint8_t i = 0; i < can_list_size; ++i) {
auto can = can_list[i];
uint32_t id = util::random_int(0, std::size(dlc_to_len) - 1);
const std::string &dat = test_data[dlc_to_len[id]];
can.setAddress(i);
can.setSrc(util::random_int(0, 3) + bus_offset);
can.setDat(kj::ArrayPtr((uint8_t *)dat.data(), dat.size()));
total_pakets_size += sizeof(can_header) + dat.size();
}
can_data_list = can_list.asReader();
INFO("test " << can_list_size << " packets, total size " << total_pakets_size);
}
void PandaTest::test_can_send() {
std::vector<uint8_t> unpacked_data;
this->pack_can_buffer(can_data_list, [&](uint8_t *chunk, size_t size) {
unpacked_data.insert(unpacked_data.end(), chunk, &chunk[size]);
});
REQUIRE(unpacked_data.size() == total_pakets_size);
int cnt = 0;
INFO("test can message integrity");
for (int pos = 0, pckt_len = 0; pos < unpacked_data.size(); pos += pckt_len) {
can_header header;
memcpy(&header, &unpacked_data[pos], sizeof(can_header));
const uint8_t data_len = dlc_to_len[header.data_len_code];
pckt_len = sizeof(can_header) + data_len;
REQUIRE(header.addr == cnt);
REQUIRE(test_data.find(data_len) != test_data.end());
const std::string &dat = test_data[data_len];
REQUIRE(memcmp(dat.data(), &unpacked_data[pos + sizeof(can_header)], dat.size()) == 0);
++cnt;
}
REQUIRE(cnt == can_list_size);
}
void PandaTest::test_can_recv(uint32_t rx_chunk_size) {
std::vector<can_frame> frames;
this->pack_can_buffer(can_data_list, [&](uint8_t *data, uint32_t size) {
if (rx_chunk_size == 0) {
REQUIRE(this->unpack_can_buffer(data, size, frames));
} else {
this->receive_buffer_size = 0;
uint32_t pos = 0;
while (pos < size) {
uint32_t chunk_size = std::min(rx_chunk_size, size - pos);
memcpy(&this->receive_buffer[this->receive_buffer_size], &data[pos], chunk_size);
this->receive_buffer_size += chunk_size;
pos += chunk_size;
REQUIRE(this->unpack_can_buffer(this->receive_buffer, this->receive_buffer_size, frames));
}
}
});
REQUIRE(frames.size() == can_list_size);
for (int i = 0; i < frames.size(); ++i) {
REQUIRE(frames[i].address == i);
REQUIRE(test_data.find(frames[i].dat.size()) != test_data.end());
const std::string &dat = test_data[frames[i].dat.size()];
REQUIRE(memcmp(dat.data(), frames[i].dat.data(), dat.size()) == 0);
}
}
TEST_CASE("send/recv CAN 2.0 packets") {
auto bus_offset = GENERATE(0, 4);
auto can_list_size = GENERATE(1, 3, 5, 10, 30, 60, 100, 200);
PandaTest test(bus_offset, can_list_size, cereal::PandaState::PandaType::DOS);
SECTION("can_send") {
test.test_can_send();
}
SECTION("can_receive") {
test.test_can_recv();
}
SECTION("chunked_can_receive") {
test.test_can_recv(0x40);
}
}
TEST_CASE("send/recv CAN FD packets") {
auto bus_offset = GENERATE(0, 4);
auto can_list_size = GENERATE(1, 3, 5, 10, 30, 60, 100, 200);
PandaTest test(bus_offset, can_list_size, cereal::PandaState::PandaType::RED_PANDA);
SECTION("can_send") {
test.test_can_send();
}
SECTION("can_receive") {
test.test_can_recv();
}
SECTION("chunked_can_receive") {
test.test_can_recv(0x40);
}
}

View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
import os
import pytest
import time
import unittest
import cereal.messaging as messaging
from cereal import log
from openpilot.common.gpio import gpio_set, gpio_init
from panda import Panda, PandaDFU, PandaProtocolMismatch
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.system.hardware import HARDWARE
from openpilot.system.hardware.tici.pins import GPIO
HERE = os.path.dirname(os.path.realpath(__file__))
@pytest.mark.tici
class TestPandad(unittest.TestCase):
def setUp(self):
# ensure panda is up
if len(Panda.list()) == 0:
self._run_test(60)
def tearDown(self):
managed_processes['pandad'].stop()
def _run_test(self, timeout=30):
managed_processes['pandad'].start()
sm = messaging.SubMaster(['peripheralState'])
for _ in range(timeout*10):
sm.update(100)
if sm['peripheralState'].pandaType != log.PandaState.PandaType.unknown:
break
managed_processes['pandad'].stop()
if sm['peripheralState'].pandaType == log.PandaState.PandaType.unknown:
raise Exception("boardd failed to start")
def _go_to_dfu(self):
HARDWARE.recover_internal_panda()
assert Panda.wait_for_dfu(None, 10)
def _assert_no_panda(self):
assert not Panda.wait_for_dfu(None, 3)
assert not Panda.wait_for_panda(None, 3)
def _flash_bootstub_and_test(self, fn, expect_mismatch=False):
self._go_to_dfu()
pd = PandaDFU(None)
if fn is None:
fn = os.path.join(HERE, pd.get_mcu_type().config.bootstub_fn)
with open(fn, "rb") as f:
pd.program_bootstub(f.read())
pd.reset()
HARDWARE.reset_internal_panda()
assert Panda.wait_for_panda(None, 10)
if expect_mismatch:
with self.assertRaises(PandaProtocolMismatch):
Panda()
else:
with Panda() as p:
assert p.bootstub
self._run_test(45)
def test_in_dfu(self):
HARDWARE.recover_internal_panda()
self._run_test(60)
def test_in_bootstub(self):
with Panda() as p:
p.reset(enter_bootstub=True)
assert p.bootstub
self._run_test()
def test_internal_panda_reset(self):
gpio_init(GPIO.STM_RST_N, True)
gpio_set(GPIO.STM_RST_N, 1)
time.sleep(0.5)
assert all(not Panda(s).is_internal() for s in Panda.list())
self._run_test()
assert any(Panda(s).is_internal() for s in Panda.list())
def test_best_case_startup_time(self):
# run once so we're setup
self._run_test(60)
# should be fast this time
self._run_test(8)
def test_protocol_version_check(self):
if HARDWARE.get_device_type() == 'tici':
raise unittest.SkipTest("SPI test")
# flash old fw
fn = os.path.join(HERE, "bootstub.panda_h7_spiv0.bin")
self._flash_bootstub_and_test(fn, expect_mismatch=True)
def test_release_to_devel_bootstub(self):
self._flash_bootstub_and_test(None)
def test_recover_from_bad_bootstub(self):
self._go_to_dfu()
with PandaDFU(None) as pd:
pd.program_bootstub(b"\x00"*1024)
pd.reset()
HARDWARE.reset_internal_panda()
self._assert_no_panda()
self._run_test(60)
if __name__ == "__main__":
unittest.main()