Files
clearpilot/selfdrive/controls/radardless.py
Comma Device 1ff14d346f wip
2024-05-14 08:21:27 +00:00

94 lines
2.7 KiB
Python
Executable File

#!/usr/bin/env python3
import importlib
from typing import Optional
from cereal import messaging, car
from openpilot.common.params import Params
from openpilot.common.realtime import Ratekeeper, Priority, config_realtime_process
from openpilot.common.swaglog import cloudlog
# radar tracks
SPEED, ACCEL = 0, 1 # Kalman filter states enum
# stationary qualification parameters
V_EGO_STATIONARY = 4. # no stationary object flag below this speed
RADAR_TO_CENTER = 2.7 # (deprecated) RADAR is ~ 2.7m ahead from center of car
RADAR_TO_CAMERA = 1.52 # RADAR is ~ 1.5m ahead from center of mesh frame
class RadarD:
def __init__(self, radar_ts: float, delay: int = 0):
self.points: dict[int, tuple[float, float, float]] = {}
self.radar_tracks_valid = False
def update(self, rr: Optional[car.RadarData]):
radar_points = []
radar_errors = []
if rr is not None:
radar_points = rr.points
radar_errors = rr.errors
self.radar_tracks_valid = len(radar_errors) == 0
self.points = {}
for pt in radar_points:
self.points[pt.trackId] = (pt.dRel, pt.yRel, pt.vRel)
def publish(self):
tracks_msg = messaging.new_message('liveTracks', len(self.points))
tracks_msg.valid = self.radar_tracks_valid
for index, tid in enumerate(sorted(self.points.keys())):
tracks_msg.liveTracks[index] = {
"trackId": tid,
"dRel": float(self.points[tid][0]) + RADAR_TO_CAMERA,
"yRel": -float(self.points[tid][1]),
"vRel": float(self.points[tid][2]),
}
return tracks_msg
# publishes radar tracks
def main():
config_realtime_process(5, Priority.CTRL_LOW)
# wait for stats about the car to come in from controls
cloudlog.info("radard is waiting for CarParams")
with car.CarParams.from_bytes(Params().get("CarParams", block=True)) as msg:
CP = msg
cloudlog.info("radard got CarParams")
# import the radar from the fingerprint
cloudlog.info("radard is importing %s", CP.carName)
RadarInterface = importlib.import_module(f'selfdrive.car.{CP.carName}.radar_interface').RadarInterface
# *** setup messaging
can_sock = messaging.sub_sock('can')
pub_sock = messaging.pub_sock('liveTracks')
RI = RadarInterface(CP)
# TODO timing is different between cars, need a single time step for all cars
# TODO just take the fastest one for now, and keep resending same messages for slower radars
rk = Ratekeeper(1.0 / CP.radarTimeStep, print_delay_threshold=None)
RD = RadarD(CP.radarTimeStep, RI.delay)
while 1:
can_strings = messaging.drain_sock_raw(can_sock, wait_for_one=True)
rr = RI.update(can_strings)
if rr is None:
continue
RD.update(rr)
msg = RD.publish()
pub_sock.send(msg.to_bytes())
rk.monitor_time()
if __name__ == "__main__":
main()