#!/usr/bin/env python3 import os import sys import signal import itertools import math import time import requests import shutil import subprocess import datetime from multiprocessing import Process, Event from typing import NoReturn, Optional from struct import unpack_from, calcsize, pack from cereal import log import cereal.messaging as messaging from openpilot.common.gpio import gpio_init, gpio_set from openpilot.common.retry import retry from openpilot.system.hardware.tici.pins import GPIO from openpilot.common.swaglog import cloudlog from openpilot.system.qcomgpsd.modemdiag import ModemDiag, DIAG_LOG_F, setup_logs, send_recv from openpilot.system.qcomgpsd.structs import (dict_unpacker, position_report, relist, gps_measurement_report, gps_measurement_report_sv, glonass_measurement_report, glonass_measurement_report_sv, oemdre_measurement_report, oemdre_measurement_report_sv, oemdre_svpoly_report, LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT) DEBUG = int(os.getenv("DEBUG", "0"))==1 ASSIST_DATA_FILE = '/tmp/xtra3grc.bin' ASSIST_DATA_FILE_DOWNLOAD = ASSIST_DATA_FILE + '.download' ASSISTANCE_URL = 'http://xtrapath3.izatcloud.net/xtra3grc.bin' LOG_TYPES = [ LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT, ] miscStatusFields = { "multipathEstimateIsValid": 0, "directionIsValid": 1, } measurementStatusFields = { "subMillisecondIsValid": 0, "subBitTimeIsKnown": 1, "satelliteTimeIsKnown": 2, "bitEdgeConfirmedFromSignal": 3, "measuredVelocity": 4, "fineOrCoarseVelocity": 5, "lockPointValid": 6, "lockPointPositive": 7, "lastUpdateFromDifference": 9, "lastUpdateFromVelocityDifference": 10, "strongIndicationOfCrossCorelation": 11, "tentativeMeasurement": 12, "measurementNotUsable": 13, "sirCheckIsNeeded": 14, "probationMode": 15, "multipathIndicator": 24, "imdJammingIndicator": 25, "lteB13TxJammingIndicator": 26, "freshMeasurementIndicator": 27, } measurementStatusGPSFields = { "gpsRoundRobinRxDiversity": 18, "gpsRxDiversity": 19, "gpsLowBandwidthRxDiversityCombined": 20, "gpsHighBandwidthNu4": 21, "gpsHighBandwidthNu8": 22, "gpsHighBandwidthUniform": 23, } measurementStatusGlonassFields = { "glonassMeanderBitEdgeValid": 16, "glonassTimeMarkValid": 17 } # GPS tracking settings last_reported_latitude = None last_reported_longitude = None last_reported_time = time.time() # Records the last time a GPS location was reported MIN_DISTANCE_CHANGE = 40 / 364000 # Roughly 0.00011 degrees @retry(attempts=10, delay=1.0) def try_setup_logs(diag, logs): return setup_logs(diag, logs) @retry(attempts=3, delay=1.0) def at_cmd(cmd: str) -> Optional[str]: return subprocess.check_output(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True, encoding='utf8') def gps_enabled() -> bool: return "QGPS: 1" in at_cmd("AT+QGPS?") def download_assistance(): try: response = requests.get(ASSISTANCE_URL, timeout=5, stream=True) with open(ASSIST_DATA_FILE_DOWNLOAD, 'wb') as fp: for chunk in response.iter_content(chunk_size=8192): fp.write(chunk) if fp.tell() > 1e5: cloudlog.error("Qcom assistance data larger than expected") return os.rename(ASSIST_DATA_FILE_DOWNLOAD, ASSIST_DATA_FILE) except requests.exceptions.RequestException: cloudlog.exception("Failed to download assistance file") return def downloader_loop(event): if os.path.exists(ASSIST_DATA_FILE): os.remove(ASSIST_DATA_FILE) alt_path = os.getenv("QCOM_ALT_ASSISTANCE_PATH", None) if alt_path is not None and os.path.exists(alt_path): shutil.copyfile(alt_path, ASSIST_DATA_FILE) try: while not os.path.exists(ASSIST_DATA_FILE) and not event.is_set(): download_assistance() event.wait(timeout=10) except KeyboardInterrupt: pass @retry(attempts=5, delay=0.2, ignore_failure=True) def inject_assistance(): cmd = f"mmcli -m any --timeout 30 --location-inject-assistance-data={ASSIST_DATA_FILE}" subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True) cloudlog.info("successfully loaded assistance data") @retry(attempts=5, delay=1.0) def setup_quectel(diag: ModemDiag) -> bool: ret = False # enable OEMDRE in the NV # TODO: it has to reboot for this to take effect DIAG_NV_READ_F = 38 DIAG_NV_WRITE_F = 39 NV_GNSS_OEM_FEATURE_MASK = 7165 send_recv(diag, DIAG_NV_WRITE_F, pack(' NoReturn: global last_reported_time global last_reported_latitude global last_reported_longitude unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True) unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True) unpack_glonass_meas, size_glonass_meas = dict_unpacker(glonass_measurement_report, True) unpack_glonass_meas_sv, size_glonass_meas_sv = dict_unpacker(glonass_measurement_report_sv, True) unpack_oemdre_meas, size_oemdre_meas = dict_unpacker(oemdre_measurement_report, True) unpack_oemdre_meas_sv, size_oemdre_meas_sv = dict_unpacker(oemdre_measurement_report_sv, True) unpack_svpoly, _ = dict_unpacker(oemdre_svpoly_report, True) unpack_position, _ = dict_unpacker(position_report) unpack_position, _ = dict_unpacker(position_report) wait_for_modem() stop_download_event = Event() assist_fetch_proc = Process(target=downloader_loop, args=(stop_download_event,)) assist_fetch_proc.start() def cleanup(sig, frame): cloudlog.warning("caught sig disabling quectel gps") gpio_set(GPIO.GNSS_PWR_EN, False) teardown_quectel(diag) cloudlog.warning("quectel cleanup done") stop_download_event.set() assist_fetch_proc.kill() assist_fetch_proc.join() sys.exit(0) signal.signal(signal.SIGINT, cleanup) signal.signal(signal.SIGTERM, cleanup) # connect to modem diag = ModemDiag() r = setup_quectel(diag) want_assistance = not r cloudlog.warning("quectel setup done") gpio_init(GPIO.GNSS_PWR_EN, True) gpio_set(GPIO.GNSS_PWR_EN, True) pm = messaging.PubMaster(['qcomGnss', 'gpsLocation']) while 1: if os.path.exists(ASSIST_DATA_FILE) and want_assistance: setup_quectel(diag) want_assistance = False opcode, payload = diag.recv() if opcode != DIAG_LOG_F: cloudlog.error(f"Unhandled opcode: {opcode}") continue (pending_msgs, log_outer_length), inner_log_packet = unpack_from(' 0: cloudlog.debug("have %d pending messages" % pending_msgs) assert log_outer_length == len(inner_log_packet) (log_inner_length, log_type, log_time), log_payload = unpack_from('= 1 and distance_change >= MIN_DISTANCE_CHANGE: should_report = True else: if time_since_last_report >= 30 and distance_change >= MIN_DISTANCE_CHANGE: should_report = True # Execute reporting if conditions are met if should_report: if os.path.exists("/data/brian/gps_tracking.sh"): # Update the last reported time and location last_reported_time = current_time last_reported_latitude = gps.latitude last_reported_longitude = gps.longitude # Call the script subprocess.run(["/data/brian/gps_tracking.sh", gps.latitude, gps.longitude, gps.speed], check=True) elif log_type == LOG_GNSS_OEMDRE_SVPOLY_REPORT: msg = messaging.new_message('qcomGnss', valid=True) dat = unpack_svpoly(log_payload) dat = relist(dat) gnss = msg.qcomGnss gnss.logTs = log_time gnss.init('drSvPoly') poly = gnss.drSvPoly for k,v in dat.items(): if k == "version": assert v == 2 elif k == "flags": pass else: setattr(poly, k, v) ''' # Timestamp glonass polys with GPSTime from laika.gps_time import GPSTime, utc_to_gpst, get_leap_seconds from laika.helpers import get_prn_from_nmea_id prn = get_prn_from_nmea_id(poly.svId) if prn[0] == 'R': epoch = GPSTime(current_gps_time.week, (poly.t0 - 3*SECS_IN_HR + SECS_IN_DAY) % (SECS_IN_WEEK) + get_leap_seconds(current_gps_time)) else: epoch = GPSTime(current_gps_time.week, poly.t0) # handle week rollover if epoch.tow < SECS_IN_DAY and current_gps_time.tow > 6*SECS_IN_DAY: epoch.week += 1 elif epoch.tow > 6*SECS_IN_DAY and current_gps_time.tow < SECS_IN_DAY: epoch.week -= 1 poly.gpsWeek = epoch.week poly.gpsTow = epoch.tow ''' pm.send('qcomGnss', msg) elif log_type in [LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT]: msg = messaging.new_message('qcomGnss', valid=True) gnss = msg.qcomGnss gnss.logTs = log_time gnss.init('measurementReport') report = gnss.measurementReport if log_type == LOG_GNSS_GPS_MEASUREMENT_REPORT: dat = unpack_gps_meas(log_payload) sats = log_payload[size_gps_meas:] unpack_meas_sv, size_meas_sv = unpack_gps_meas_sv, size_gps_meas_sv report.source = 0 # gps measurement_status_fields = (measurementStatusFields.items(), measurementStatusGPSFields.items()) elif log_type == LOG_GNSS_GLONASS_MEASUREMENT_REPORT: dat = unpack_glonass_meas(log_payload) sats = log_payload[size_glonass_meas:] unpack_meas_sv, size_meas_sv = unpack_glonass_meas_sv, size_glonass_meas_sv report.source = 1 # glonass measurement_status_fields = (measurementStatusFields.items(), measurementStatusGlonassFields.items()) else: raise RuntimeError(f"invalid log_type: {log_type}") for k,v in dat.items(): if k == "version": assert v == 0 elif k == "week": report.gpsWeek = v elif k == "svCount": pass else: setattr(report, k, v) report.init('sv', dat['svCount']) if dat['svCount'] > 0: assert len(sats)//dat['svCount'] == size_meas_sv for i in range(dat['svCount']): sv = report.sv[i] sv.init('measurementStatus') sat = unpack_meas_sv(sats[size_meas_sv*i:size_meas_sv*(i+1)]) for k,v in sat.items(): if k == "parityErrorCount": sv.gpsParityErrorCount = v elif k == "frequencyIndex": sv.glonassFrequencyIndex = v elif k == "hemmingErrorCount": sv.glonassHemmingErrorCount = v elif k == "measurementStatus": for kk,vv in itertools.chain(*measurement_status_fields): setattr(sv.measurementStatus, kk, bool(v & (1<