wip
This commit is contained in:
1
selfdrive/locationd/test/.gitignore
vendored
Normal file
1
selfdrive/locationd/test/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
out/
|
||||
0
selfdrive/locationd/test/__init__.py
Normal file
0
selfdrive/locationd/test/__init__.py
Normal file
117
selfdrive/locationd/test/test_calibrationd.py
Normal file
117
selfdrive/locationd/test/test_calibrationd.py
Normal file
@@ -0,0 +1,117 @@
|
||||
#!/usr/bin/env python3
|
||||
import random
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal import log
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.selfdrive.locationd.calibrationd import Calibrator, INPUTS_NEEDED, INPUTS_WANTED, BLOCK_SIZE, MIN_SPEED_FILTER, \
|
||||
MAX_YAW_RATE_FILTER, SMOOTH_CYCLES, HEIGHT_INIT, MAX_ALLOWED_PITCH_SPREAD, MAX_ALLOWED_YAW_SPREAD
|
||||
|
||||
|
||||
def process_messages(c, cam_odo_calib, cycles,
|
||||
cam_odo_speed=MIN_SPEED_FILTER + 1,
|
||||
carstate_speed=MIN_SPEED_FILTER + 1,
|
||||
cam_odo_yr=0.0,
|
||||
cam_odo_speed_std=1e-3,
|
||||
cam_odo_height_std=1e-3):
|
||||
old_rpy_weight_prev = 0.0
|
||||
for _ in range(cycles):
|
||||
assert (old_rpy_weight_prev - c.old_rpy_weight < 1/SMOOTH_CYCLES + 1e-3)
|
||||
old_rpy_weight_prev = c.old_rpy_weight
|
||||
c.handle_v_ego(carstate_speed)
|
||||
c.handle_cam_odom([cam_odo_speed,
|
||||
np.sin(cam_odo_calib[2]) * cam_odo_speed,
|
||||
-np.sin(cam_odo_calib[1]) * cam_odo_speed],
|
||||
[0.0, 0.0, cam_odo_yr],
|
||||
[0.0, 0.0, 0.0],
|
||||
[cam_odo_speed_std, cam_odo_speed_std, cam_odo_speed_std],
|
||||
[0.0, 0.0, HEIGHT_INIT.item()],
|
||||
[cam_odo_height_std, cam_odo_height_std, cam_odo_height_std])
|
||||
|
||||
class TestCalibrationd(unittest.TestCase):
|
||||
|
||||
def test_read_saved_params(self):
|
||||
msg = messaging.new_message('liveCalibration')
|
||||
msg.liveCalibration.validBlocks = random.randint(1, 10)
|
||||
msg.liveCalibration.rpyCalib = [random.random() for _ in range(3)]
|
||||
msg.liveCalibration.height = [random.random() for _ in range(1)]
|
||||
Params().put("CalibrationParams", msg.to_bytes())
|
||||
c = Calibrator(param_put=True)
|
||||
|
||||
np.testing.assert_allclose(msg.liveCalibration.rpyCalib, c.rpy)
|
||||
np.testing.assert_allclose(msg.liveCalibration.height, c.height)
|
||||
self.assertEqual(msg.liveCalibration.validBlocks, c.valid_blocks)
|
||||
|
||||
|
||||
def test_calibration_basics(self):
|
||||
c = Calibrator(param_put=False)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_WANTED)
|
||||
self.assertEqual(c.valid_blocks, INPUTS_WANTED)
|
||||
np.testing.assert_allclose(c.rpy, np.zeros(3))
|
||||
np.testing.assert_allclose(c.height, HEIGHT_INIT)
|
||||
c.reset()
|
||||
|
||||
|
||||
def test_calibration_low_speed_reject(self):
|
||||
c = Calibrator(param_put=False)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_WANTED, cam_odo_speed=MIN_SPEED_FILTER - 1)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_WANTED, carstate_speed=MIN_SPEED_FILTER - 1)
|
||||
self.assertEqual(c.valid_blocks, 0)
|
||||
np.testing.assert_allclose(c.rpy, np.zeros(3))
|
||||
np.testing.assert_allclose(c.height, HEIGHT_INIT)
|
||||
|
||||
|
||||
def test_calibration_yaw_rate_reject(self):
|
||||
c = Calibrator(param_put=False)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_WANTED, cam_odo_yr=MAX_YAW_RATE_FILTER)
|
||||
self.assertEqual(c.valid_blocks, 0)
|
||||
np.testing.assert_allclose(c.rpy, np.zeros(3))
|
||||
np.testing.assert_allclose(c.height, HEIGHT_INIT)
|
||||
|
||||
|
||||
def test_calibration_speed_std_reject(self):
|
||||
c = Calibrator(param_put=False)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_WANTED, cam_odo_speed_std=1e3)
|
||||
self.assertEqual(c.valid_blocks, INPUTS_NEEDED)
|
||||
np.testing.assert_allclose(c.rpy, np.zeros(3))
|
||||
|
||||
|
||||
def test_calibration_speed_std_height_reject(self):
|
||||
c = Calibrator(param_put=False)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_WANTED, cam_odo_height_std=1e3)
|
||||
self.assertEqual(c.valid_blocks, INPUTS_NEEDED)
|
||||
np.testing.assert_allclose(c.rpy, np.zeros(3))
|
||||
|
||||
|
||||
def test_calibration_auto_reset(self):
|
||||
c = Calibrator(param_put=False)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_NEEDED)
|
||||
self.assertEqual(c.valid_blocks, INPUTS_NEEDED)
|
||||
np.testing.assert_allclose(c.rpy, [0.0, 0.0, 0.0], atol=1e-3)
|
||||
process_messages(c, [0.0, MAX_ALLOWED_PITCH_SPREAD*0.9, MAX_ALLOWED_YAW_SPREAD*0.9], BLOCK_SIZE + 10)
|
||||
self.assertEqual(c.valid_blocks, INPUTS_NEEDED + 1)
|
||||
self.assertEqual(c.cal_status, log.LiveCalibrationData.Status.calibrated)
|
||||
|
||||
c = Calibrator(param_put=False)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_NEEDED)
|
||||
self.assertEqual(c.valid_blocks, INPUTS_NEEDED)
|
||||
np.testing.assert_allclose(c.rpy, [0.0, 0.0, 0.0])
|
||||
process_messages(c, [0.0, MAX_ALLOWED_PITCH_SPREAD*1.1, 0.0], BLOCK_SIZE + 10)
|
||||
self.assertEqual(c.valid_blocks, 1)
|
||||
self.assertEqual(c.cal_status, log.LiveCalibrationData.Status.recalibrating)
|
||||
np.testing.assert_allclose(c.rpy, [0.0, MAX_ALLOWED_PITCH_SPREAD*1.1, 0.0], atol=1e-2)
|
||||
|
||||
c = Calibrator(param_put=False)
|
||||
process_messages(c, [0.0, 0.0, 0.0], BLOCK_SIZE * INPUTS_NEEDED)
|
||||
self.assertEqual(c.valid_blocks, INPUTS_NEEDED)
|
||||
np.testing.assert_allclose(c.rpy, [0.0, 0.0, 0.0])
|
||||
process_messages(c, [0.0, 0.0, MAX_ALLOWED_YAW_SPREAD*1.1], BLOCK_SIZE + 10)
|
||||
self.assertEqual(c.valid_blocks, 1)
|
||||
self.assertEqual(c.cal_status, log.LiveCalibrationData.Status.recalibrating)
|
||||
np.testing.assert_allclose(c.rpy, [0.0, 0.0, MAX_ALLOWED_YAW_SPREAD*1.1], atol=1e-2)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
95
selfdrive/locationd/test/test_locationd.py
Normal file
95
selfdrive/locationd/test/test_locationd.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import random
|
||||
import unittest
|
||||
import time
|
||||
import capnp
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal.services import SERVICE_LIST
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.common.transformations.coordinates import ecef2geodetic
|
||||
|
||||
from openpilot.selfdrive.manager.process_config import managed_processes
|
||||
|
||||
|
||||
class TestLocationdProc(unittest.TestCase):
|
||||
LLD_MSGS = ['gpsLocationExternal', 'cameraOdometry', 'carState', 'liveCalibration',
|
||||
'accelerometer', 'gyroscope', 'magnetometer']
|
||||
|
||||
def setUp(self):
|
||||
random.seed(123489234)
|
||||
|
||||
self.pm = messaging.PubMaster(self.LLD_MSGS)
|
||||
|
||||
self.params = Params()
|
||||
self.params.put_bool("UbloxAvailable", True)
|
||||
managed_processes['locationd'].prepare()
|
||||
managed_processes['locationd'].start()
|
||||
|
||||
def tearDown(self):
|
||||
managed_processes['locationd'].stop()
|
||||
|
||||
def get_msg(self, name, t):
|
||||
try:
|
||||
msg = messaging.new_message(name)
|
||||
except capnp.lib.capnp.KjException:
|
||||
msg = messaging.new_message(name, 0)
|
||||
|
||||
if name == "gpsLocationExternal":
|
||||
msg.gpsLocationExternal.flags = 1
|
||||
msg.gpsLocationExternal.hasFix = True
|
||||
msg.gpsLocationExternal.verticalAccuracy = 1.0
|
||||
msg.gpsLocationExternal.speedAccuracy = 1.0
|
||||
msg.gpsLocationExternal.bearingAccuracyDeg = 1.0
|
||||
msg.gpsLocationExternal.vNED = [0.0, 0.0, 0.0]
|
||||
msg.gpsLocationExternal.latitude = float(self.lat)
|
||||
msg.gpsLocationExternal.longitude = float(self.lon)
|
||||
msg.gpsLocationExternal.unixTimestampMillis = t * 1e6
|
||||
msg.gpsLocationExternal.altitude = float(self.alt)
|
||||
#if name == "gnssMeasurements":
|
||||
# msg.gnssMeasurements.measTime = t
|
||||
# msg.gnssMeasurements.positionECEF.value = [self.x , self.y, self.z]
|
||||
# msg.gnssMeasurements.positionECEF.std = [0,0,0]
|
||||
# msg.gnssMeasurements.positionECEF.valid = True
|
||||
# msg.gnssMeasurements.velocityECEF.value = []
|
||||
# msg.gnssMeasurements.velocityECEF.std = [0,0,0]
|
||||
# msg.gnssMeasurements.velocityECEF.valid = True
|
||||
elif name == 'cameraOdometry':
|
||||
msg.cameraOdometry.rot = [0.0, 0.0, 0.0]
|
||||
msg.cameraOdometry.rotStd = [0.0, 0.0, 0.0]
|
||||
msg.cameraOdometry.trans = [0.0, 0.0, 0.0]
|
||||
msg.cameraOdometry.transStd = [0.0, 0.0, 0.0]
|
||||
msg.logMonoTime = t
|
||||
msg.valid = True
|
||||
return msg
|
||||
|
||||
def test_params_gps(self):
|
||||
self.params.remove('LastGPSPosition')
|
||||
|
||||
self.x = -2710700 + (random.random() * 1e5)
|
||||
self.y = -4280600 + (random.random() * 1e5)
|
||||
self.z = 3850300 + (random.random() * 1e5)
|
||||
self.lat, self.lon, self.alt = ecef2geodetic([self.x, self.y, self.z])
|
||||
|
||||
# get fake messages at the correct frequency, listed in services.py
|
||||
msgs = []
|
||||
for sec in range(65):
|
||||
for name in self.LLD_MSGS:
|
||||
for j in range(int(SERVICE_LIST[name].frequency)):
|
||||
msgs.append(self.get_msg(name, int((sec + j / SERVICE_LIST[name].frequency) * 1e9)))
|
||||
|
||||
for msg in sorted(msgs, key=lambda x: x.logMonoTime):
|
||||
self.pm.send(msg.which(), msg)
|
||||
if msg.which() == "cameraOdometry":
|
||||
self.pm.wait_for_readers_to_update(msg.which(), 0.1, dt=0.005)
|
||||
time.sleep(1) # wait for async params write
|
||||
|
||||
lastGPS = json.loads(self.params.get('LastGPSPosition'))
|
||||
self.assertAlmostEqual(lastGPS['latitude'], self.lat, places=3)
|
||||
self.assertAlmostEqual(lastGPS['longitude'], self.lon, places=3)
|
||||
self.assertAlmostEqual(lastGPS['altitude'], self.alt, places=3)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
229
selfdrive/locationd/test/test_locationd_scenarios.py
Normal file
229
selfdrive/locationd/test/test_locationd_scenarios.py
Normal file
@@ -0,0 +1,229 @@
|
||||
#!/usr/bin/env python3
|
||||
import pytest
|
||||
import unittest
|
||||
import numpy as np
|
||||
from collections import defaultdict
|
||||
from enum import Enum
|
||||
|
||||
from openpilot.tools.lib.logreader import LogReader
|
||||
from openpilot.selfdrive.test.process_replay.migration import migrate_all
|
||||
from openpilot.selfdrive.test.process_replay.process_replay import replay_process_with_name
|
||||
|
||||
TEST_ROUTE = "ff2bd20623fcaeaa|2023-09-05--10-14-54/4"
|
||||
GPS_MESSAGES = ['gpsLocationExternal', 'gpsLocation']
|
||||
SELECT_COMPARE_FIELDS = {
|
||||
'yaw_rate': ['angularVelocityCalibrated', 'value', 2],
|
||||
'roll': ['orientationNED', 'value', 0],
|
||||
'gps_flag': ['gpsOK'],
|
||||
'inputs_flag': ['inputsOK'],
|
||||
'sensors_flag': ['sensorsOK'],
|
||||
}
|
||||
JUNK_IDX = 100
|
||||
|
||||
|
||||
class Scenario(Enum):
|
||||
BASE = 'base'
|
||||
GPS_OFF = 'gps_off'
|
||||
GPS_OFF_MIDWAY = 'gps_off_midway'
|
||||
GPS_ON_MIDWAY = 'gps_on_midway'
|
||||
GPS_TUNNEL = 'gps_tunnel'
|
||||
GYRO_OFF = 'gyro_off'
|
||||
GYRO_SPIKE_MIDWAY = 'gyro_spike_midway'
|
||||
ACCEL_OFF = 'accel_off'
|
||||
ACCEL_SPIKE_MIDWAY = 'accel_spike_midway'
|
||||
|
||||
|
||||
def get_select_fields_data(logs):
|
||||
def get_nested_keys(msg, keys):
|
||||
val = None
|
||||
for key in keys:
|
||||
val = getattr(msg if val is None else val, key) if isinstance(key, str) else val[key]
|
||||
return val
|
||||
llk = [x.liveLocationKalman for x in logs if x.which() == 'liveLocationKalman']
|
||||
data = defaultdict(list)
|
||||
for msg in llk:
|
||||
for key, fields in SELECT_COMPARE_FIELDS.items():
|
||||
data[key].append(get_nested_keys(msg, fields))
|
||||
for key in data:
|
||||
data[key] = np.array(data[key][JUNK_IDX:], dtype=float)
|
||||
return data
|
||||
|
||||
|
||||
def run_scenarios(scenario, logs):
|
||||
if scenario == Scenario.BASE:
|
||||
pass
|
||||
|
||||
elif scenario == Scenario.GPS_OFF:
|
||||
logs = sorted([x for x in logs if x.which() not in GPS_MESSAGES], key=lambda x: x.logMonoTime)
|
||||
|
||||
elif scenario == Scenario.GPS_OFF_MIDWAY:
|
||||
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
|
||||
gps = [x for x in logs if x.which() in GPS_MESSAGES]
|
||||
logs = sorted(non_gps + gps[: len(gps) // 2], key=lambda x: x.logMonoTime)
|
||||
|
||||
elif scenario == Scenario.GPS_ON_MIDWAY:
|
||||
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
|
||||
gps = [x for x in logs if x.which() in GPS_MESSAGES]
|
||||
logs = sorted(non_gps + gps[len(gps) // 2:], key=lambda x: x.logMonoTime)
|
||||
|
||||
elif scenario == Scenario.GPS_TUNNEL:
|
||||
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
|
||||
gps = [x for x in logs if x.which() in GPS_MESSAGES]
|
||||
logs = sorted(non_gps + gps[:len(gps) // 4] + gps[-len(gps) // 4:], key=lambda x: x.logMonoTime)
|
||||
|
||||
elif scenario == Scenario.GYRO_OFF:
|
||||
logs = sorted([x for x in logs if x.which() != 'gyroscope'], key=lambda x: x.logMonoTime)
|
||||
|
||||
elif scenario == Scenario.GYRO_SPIKE_MIDWAY:
|
||||
non_gyro = [x for x in logs if x.which() not in 'gyroscope']
|
||||
gyro = [x for x in logs if x.which() in 'gyroscope']
|
||||
temp = gyro[len(gyro) // 2].as_builder()
|
||||
temp.gyroscope.gyroUncalibrated.v[0] += 3.0
|
||||
gyro[len(gyro) // 2] = temp.as_reader()
|
||||
logs = sorted(non_gyro + gyro, key=lambda x: x.logMonoTime)
|
||||
|
||||
elif scenario == Scenario.ACCEL_OFF:
|
||||
logs = sorted([x for x in logs if x.which() != 'accelerometer'], key=lambda x: x.logMonoTime)
|
||||
|
||||
elif scenario == Scenario.ACCEL_SPIKE_MIDWAY:
|
||||
non_accel = [x for x in logs if x.which() not in 'accelerometer']
|
||||
accel = [x for x in logs if x.which() in 'accelerometer']
|
||||
temp = accel[len(accel) // 2].as_builder()
|
||||
temp.accelerometer.acceleration.v[0] += 10.0
|
||||
accel[len(accel) // 2] = temp.as_reader()
|
||||
logs = sorted(non_accel + accel, key=lambda x: x.logMonoTime)
|
||||
|
||||
replayed_logs = replay_process_with_name(name='locationd', lr=logs)
|
||||
return get_select_fields_data(logs), get_select_fields_data(replayed_logs)
|
||||
|
||||
|
||||
@pytest.mark.xdist_group("test_locationd_scenarios")
|
||||
@pytest.mark.shared_download_cache
|
||||
class TestLocationdScenarios(unittest.TestCase):
|
||||
"""
|
||||
Test locationd with different scenarios. In all these scenarios, we expect the following:
|
||||
- locationd kalman filter should never go unstable (we care mostly about yaw_rate, roll, gpsOK, inputsOK, sensorsOK)
|
||||
- faulty values should be ignored, with appropriate flags set
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.logs = migrate_all(LogReader(TEST_ROUTE))
|
||||
|
||||
def test_base(self):
|
||||
"""
|
||||
Test: unchanged log
|
||||
Expected Result:
|
||||
- yaw_rate: unchanged
|
||||
- roll: unchanged
|
||||
"""
|
||||
orig_data, replayed_data = run_scenarios(Scenario.BASE, self.logs)
|
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
|
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
|
||||
|
||||
def test_gps_off(self):
|
||||
"""
|
||||
Test: no GPS message for the entire segment
|
||||
Expected Result:
|
||||
- yaw_rate: unchanged
|
||||
- roll:
|
||||
- gpsOK: False
|
||||
"""
|
||||
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF, self.logs)
|
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
|
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
|
||||
self.assertTrue(np.all(replayed_data['gps_flag'] == 0.0))
|
||||
|
||||
def test_gps_off_midway(self):
|
||||
"""
|
||||
Test: no GPS message for the second half of the segment
|
||||
Expected Result:
|
||||
- yaw_rate: unchanged
|
||||
- roll:
|
||||
- gpsOK: True for the first half, False for the second half
|
||||
"""
|
||||
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF_MIDWAY, self.logs)
|
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
|
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
|
||||
self.assertTrue(np.diff(replayed_data['gps_flag'])[512] == -1.0)
|
||||
|
||||
def test_gps_on_midway(self):
|
||||
"""
|
||||
Test: no GPS message for the first half of the segment
|
||||
Expected Result:
|
||||
- yaw_rate: unchanged
|
||||
- roll:
|
||||
- gpsOK: False for the first half, True for the second half
|
||||
"""
|
||||
orig_data, replayed_data = run_scenarios(Scenario.GPS_ON_MIDWAY, self.logs)
|
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
|
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(1.5)))
|
||||
self.assertTrue(np.diff(replayed_data['gps_flag'])[505] == 1.0)
|
||||
|
||||
def test_gps_tunnel(self):
|
||||
"""
|
||||
Test: no GPS message for the middle section of the segment
|
||||
Expected Result:
|
||||
- yaw_rate: unchanged
|
||||
- roll:
|
||||
- gpsOK: False for the middle section, True for the rest
|
||||
"""
|
||||
orig_data, replayed_data = run_scenarios(Scenario.GPS_TUNNEL, self.logs)
|
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
|
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
|
||||
self.assertTrue(np.diff(replayed_data['gps_flag'])[213] == -1.0)
|
||||
self.assertTrue(np.diff(replayed_data['gps_flag'])[805] == 1.0)
|
||||
|
||||
def test_gyro_off(self):
|
||||
"""
|
||||
Test: no gyroscope message for the entire segment
|
||||
Expected Result:
|
||||
- yaw_rate: 0
|
||||
- roll: 0
|
||||
- sensorsOK: False
|
||||
"""
|
||||
_, replayed_data = run_scenarios(Scenario.GYRO_OFF, self.logs)
|
||||
self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0))
|
||||
self.assertTrue(np.allclose(replayed_data['roll'], 0.0))
|
||||
self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0))
|
||||
|
||||
def test_gyro_spikes(self):
|
||||
"""
|
||||
Test: a gyroscope spike in the middle of the segment
|
||||
Expected Result:
|
||||
- yaw_rate: unchanged
|
||||
- roll: unchanged
|
||||
- inputsOK: False for some time after the spike, True for the rest
|
||||
"""
|
||||
orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY, self.logs)
|
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
|
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
|
||||
self.assertTrue(np.diff(replayed_data['inputs_flag'])[500] == -1.0)
|
||||
self.assertTrue(np.diff(replayed_data['inputs_flag'])[694] == 1.0)
|
||||
|
||||
def test_accel_off(self):
|
||||
"""
|
||||
Test: no accelerometer message for the entire segment
|
||||
Expected Result:
|
||||
- yaw_rate: 0
|
||||
- roll: 0
|
||||
- sensorsOK: False
|
||||
"""
|
||||
_, replayed_data = run_scenarios(Scenario.ACCEL_OFF, self.logs)
|
||||
self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0))
|
||||
self.assertTrue(np.allclose(replayed_data['roll'], 0.0))
|
||||
self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0))
|
||||
|
||||
def test_accel_spikes(self):
|
||||
"""
|
||||
ToDo:
|
||||
Test: an accelerometer spike in the middle of the segment
|
||||
Expected Result: Right now, the kalman filter is not robust to small spikes like it is to gyroscope spikes.
|
||||
"""
|
||||
orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY, self.logs)
|
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
|
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user