94 lines
2.7 KiB
Python
94 lines
2.7 KiB
Python
#!/usr/bin/env python3
|
|
import importlib
|
|
from typing import Optional
|
|
|
|
from cereal import messaging, car
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.realtime import Ratekeeper, Priority, config_realtime_process
|
|
from openpilot.common.swaglog import cloudlog
|
|
|
|
|
|
# radar tracks
|
|
SPEED, ACCEL = 0, 1 # Kalman filter states enum
|
|
|
|
# stationary qualification parameters
|
|
V_EGO_STATIONARY = 4. # no stationary object flag below this speed
|
|
|
|
RADAR_TO_CENTER = 2.7 # (deprecated) RADAR is ~ 2.7m ahead from center of car
|
|
RADAR_TO_CAMERA = 1.52 # RADAR is ~ 1.5m ahead from center of mesh frame
|
|
|
|
|
|
class RadarD:
|
|
def __init__(self, radar_ts: float, delay: int = 0):
|
|
self.points: dict[int, tuple[float, float, float]] = {}
|
|
|
|
self.radar_tracks_valid = False
|
|
|
|
def update(self, rr: Optional[car.RadarData]):
|
|
radar_points = []
|
|
radar_errors = []
|
|
if rr is not None:
|
|
radar_points = rr.points
|
|
radar_errors = rr.errors
|
|
|
|
self.radar_tracks_valid = len(radar_errors) == 0
|
|
|
|
self.points = {}
|
|
for pt in radar_points:
|
|
self.points[pt.trackId] = (pt.dRel, pt.yRel, pt.vRel)
|
|
|
|
def publish(self):
|
|
tracks_msg = messaging.new_message('liveTracks', len(self.points))
|
|
tracks_msg.valid = self.radar_tracks_valid
|
|
for index, tid in enumerate(sorted(self.points.keys())):
|
|
tracks_msg.liveTracks[index] = {
|
|
"trackId": tid,
|
|
"dRel": float(self.points[tid][0]) + RADAR_TO_CAMERA,
|
|
"yRel": -float(self.points[tid][1]),
|
|
"vRel": float(self.points[tid][2]),
|
|
}
|
|
|
|
return tracks_msg
|
|
|
|
|
|
# publishes radar tracks
|
|
def main():
|
|
config_realtime_process(5, Priority.CTRL_LOW)
|
|
|
|
# wait for stats about the car to come in from controls
|
|
cloudlog.info("radard is waiting for CarParams")
|
|
with car.CarParams.from_bytes(Params().get("CarParams", block=True)) as msg:
|
|
CP = msg
|
|
cloudlog.info("radard got CarParams")
|
|
|
|
# import the radar from the fingerprint
|
|
cloudlog.info("radard is importing %s", CP.carName)
|
|
RadarInterface = importlib.import_module(f'selfdrive.car.{CP.carName}.radar_interface').RadarInterface
|
|
|
|
# *** setup messaging
|
|
can_sock = messaging.sub_sock('can')
|
|
pub_sock = messaging.pub_sock('liveTracks')
|
|
|
|
RI = RadarInterface(CP)
|
|
|
|
# TODO timing is different between cars, need a single time step for all cars
|
|
# TODO just take the fastest one for now, and keep resending same messages for slower radars
|
|
rk = Ratekeeper(1.0 / CP.radarTimeStep, print_delay_threshold=None)
|
|
RD = RadarD(CP.radarTimeStep, RI.delay)
|
|
|
|
while 1:
|
|
can_strings = messaging.drain_sock_raw(can_sock, wait_for_one=True)
|
|
rr = RI.update(can_strings)
|
|
if rr is None:
|
|
continue
|
|
|
|
RD.update(rr)
|
|
msg = RD.publish()
|
|
pub_sock.send(msg.to_bytes())
|
|
|
|
rk.monitor_time()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|