Files
clearpilot/selfdrive/frogpilot/frogpilot_process.py
Your Name f015a8368b wip
2024-05-05 05:10:06 -05:00

132 lines
4.3 KiB
Python

import datetime
import http.client
import os
import socket
import time
import urllib.error
import urllib.request
import cereal.messaging as messaging
from cereal import car, log
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL, Priority, config_realtime_process
from openpilot.common.time import system_time_valid
from openpilot.system.hardware import HARDWARE
from openpilot.selfdrive.frogpilot.controls.frogpilot_planner import FrogPilotPlanner
from openpilot.selfdrive.frogpilot.controls.lib.frogpilot_functions import FrogPilotFunctions
from openpilot.selfdrive.frogpilot.controls.lib.model_manager import DEFAULT_MODEL, DEFAULT_MODEL_NAME, download_model, populate_models
from openpilot.selfdrive.frogpilot.controls.lib.theme_manager import ThemeManager
WIFI = log.DeviceState.NetworkType.wifi
# clearpilot disabled
def automatic_update_check(params):
return
# update_available = params.get_bool("UpdaterFetchAvailable")
# update_ready = params.get_bool("UpdateAvailable")
# update_state = params.get("UpdaterState", encoding='utf8')
# if update_ready:
# HARDWARE.reboot()
# elif update_available:
# os.system("pkill -SIGHUP -f selfdrive.updated.updated")
# elif update_state == "idle":
# os.system("pkill -SIGUSR1 -f selfdrive.updated.updated")
def github_pinged(url="https://github.com", timeout=5):
try:
urllib.request.urlopen(url, timeout=timeout)
return True
except (urllib.error.URLError, socket.timeout, http.client.RemoteDisconnected):
return False
# clearpilot disabled
def time_checks(automatic_updates, deviceState, params):
return
# if github_pinged():
# populate_models()
# screen_off = deviceState.screenBrightnessPercent == 0
# wifi_connection = deviceState.networkType == WIFI
# if automatic_updates and screen_off and wifi_connection:
# automatic_update_check(params)
def frogpilot_thread():
config_realtime_process(5, Priority.CTRL_LOW)
params = Params()
params_memory = Params("/dev/shm/params")
frogpilot_functions = FrogPilotFunctions()
theme_manager = ThemeManager()
CP = None
automatic_updates = params.get_bool("AutomaticUpdates")
first_run = True
model_list_empty = params.get("AvailableModelsNames", encoding='utf-8') is None
time_validated = system_time_valid()
pm = messaging.PubMaster(['frogpilotPlan'])
sm = messaging.SubMaster(['carState', 'controlsState', 'deviceState', 'frogpilotCarControl', 'frogpilotNavigation',
'frogpilotPlan', 'liveLocationKalman', 'longitudinalPlan', 'modelV2', 'radarState'],
poll='modelV2', ignore_avg_freq=['radarState'])
while True:
sm.update()
deviceState = sm['deviceState']
started = deviceState.started
if started:
if CP is None:
with car.CarParams.from_bytes(params.get("CarParams", block=True)) as msg:
CP = msg
frogpilot_planner = FrogPilotPlanner(CP)
frogpilot_planner.update_frogpilot_params()
if sm.updated['modelV2']:
frogpilot_planner.update(sm['carState'], sm['controlsState'], sm['frogpilotCarControl'], sm['frogpilotNavigation'],
sm['liveLocationKalman'], sm['modelV2'], sm['radarState'])
frogpilot_planner.publish(sm, pm)
if params_memory.get("ModelToDownload", encoding='utf-8') is not None and github_pinged():
download_model()
if params_memory.get_bool("FrogPilotTogglesUpdated"):
automatic_updates = params.get_bool("AutomaticUpdates")
if not params.get_bool("ModelSelector"):
params.put("Model", DEFAULT_MODEL)
params.put("ModelName", DEFAULT_MODEL_NAME)
if started:
frogpilot_planner.update_frogpilot_params()
else:
frogpilot_functions.backup_toggles()
if not time_validated:
time_validated = system_time_valid()
if not time_validated:
continue
if datetime.datetime.now().second == 0 or first_run or model_list_empty or params_memory.get_bool("ManualUpdateInitiated"):
if not started or model_list_empty:
time_checks(automatic_updates, deviceState, params)
model_list_empty = params.get("AvailableModelsNames", encoding='utf-8') is None
theme_manager.update_holiday()
first_run = False
time.sleep(DT_MDL)
def main():
frogpilot_thread()
if __name__ == "__main__":
main()