This commit is contained in:
concordia
2024-06-15 20:20:54 -05:00
parent 9dc2951e07
commit 720c9ccbc8
2 changed files with 73 additions and 97 deletions

View File

@@ -52,6 +52,10 @@ def frogpilot_boot_functions(frogpilot_functions):
print(f"An unexpected error occurred: {e}") print(f"An unexpected error occurred: {e}")
def manager_init(frogpilot_functions) -> None: def manager_init(frogpilot_functions) -> None:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
log_dir = f"/data/log2/{timestamp}"
os.makedirs(log_dir, exist_ok=True)
frogpilot_boot = threading.Thread(target=frogpilot_boot_functions, args=(frogpilot_functions,)) frogpilot_boot = threading.Thread(target=frogpilot_boot_functions, args=(frogpilot_functions,))
frogpilot_boot.start() frogpilot_boot.start()
@@ -354,6 +358,8 @@ def manager_init(frogpilot_functions) -> None:
for p in managed_processes.values(): for p in managed_processes.values():
p.prepare() p.prepare()
return log_dir
def manager_cleanup() -> None: def manager_cleanup() -> None:
# send signals to kill all procs # send signals to kill all procs
@@ -368,7 +374,7 @@ def manager_cleanup() -> None:
last_running = "" last_running = ""
def manager_thread(frogpilot_functions) -> None: def manager_thread(frogpilot_functions, log_dir) -> None:
global last_running global last_running
cloudlog.bind(daemon="manager") cloudlog.bind(daemon="manager")
@@ -389,7 +395,7 @@ def manager_thread(frogpilot_functions) -> None:
pm = messaging.PubMaster(['managerState']) pm = messaging.PubMaster(['managerState'])
write_onroad_params(False, params) write_onroad_params(False, params)
ensure_running(managed_processes.values(), False, params=params, CP=sm['carParams'], not_run=ignore) ensure_running(managed_processes.values(), False, params=params, CP=sm['carParams'], not_run=ignore, log_dir=log_dir)
started_prev = False started_prev = False
@@ -418,7 +424,7 @@ def manager_thread(frogpilot_functions) -> None:
started_prev = started started_prev = started
ensure_running(managed_processes.values(), started, params=params, CP=sm['carParams'], not_run=ignore) ensure_running(managed_processes.values(), started, params=params, CP=sm['carParams'], not_run=ignore, log_dir=log_dir)
running = ' '.join("{}{}\u001b[0m".format("\u001b[32m" if p.proc.is_alive() else "\u001b[31m", p.name) running = ' '.join("{}{}\u001b[0m".format("\u001b[32m" if p.proc.is_alive() else "\u001b[31m", p.name)
for p in managed_processes.values() if p.proc) for p in managed_processes.values() if p.proc)

View File

@@ -20,39 +20,33 @@ from openpilot.common.swaglog import cloudlog
WATCHDOG_FN = "/dev/shm/wd_" WATCHDOG_FN = "/dev/shm/wd_"
ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None
def modified_nativelauncher(pargs: list[str], cwd: str, name: str, log_path: str) -> None:
os.environ['MANAGER_DAEMON'] = name
log_file_path = os.path.join(log_path, f"{name}.log")
with open(log_file_path, 'a') as log_file:
os.chdir(cwd)
proc = subprocess.Popen(pargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True)
for line in proc.stdout:
print(line, end='')
log_file.write(line)
proc.wait()
def launcher(proc: str, name: str) -> None: def modified_launcher(proc: str, name: str, log_path: str) -> None:
try: try:
# import the process mod = importlib.import_module(proc)
mod = importlib.import_module(proc) setproctitle(proc)
messaging.context = messaging.Context()
# rename the process cloudlog.bind(daemon=name)
setproctitle(proc) sentry.set_tag("daemon", name)
log_file_path = os.path.join(log_path, f"{name}.log")
# create new context since we forked with open(log_file_path, 'a') as log_file, subprocess.Popen(['python', '-m', proc], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) as proc:
messaging.context = messaging.Context() for line in proc.stdout:
print(line, end='')
# add daemon name tag to logs log_file.write(line)
cloudlog.bind(daemon=name) proc.wait()
sentry.set_tag("daemon", name) except Exception:
sentry.capture_exception()
# exec the process raise
mod.main()
except KeyboardInterrupt:
cloudlog.warning(f"child {proc} got SIGINT")
except Exception:
# can't install the crash handler because sys.excepthook doesn't play nice
# with threads, so catch it here.
sentry.capture_exception()
raise
def nativelauncher(pargs: list[str], cwd: str, name: str) -> None:
os.environ['MANAGER_DAEMON'] = name
# exec the process
os.chdir(cwd)
os.execvp(pargs[0], pargs)
def join_process(process: Process, timeout: float) -> None: def join_process(process: Process, timeout: float) -> None:
@@ -182,21 +176,11 @@ class NativeProcess(ManagerProcess):
def prepare(self) -> None: def prepare(self) -> None:
pass pass
def start(self) -> None: def start(self, log_path: str) -> None:
# In case we only tried a non blocking stop we need to stop it before restarting if self.shutting_down or self.proc is not None:
if self.shutting_down: return
self.stop() self.proc = Process(target=modified_nativelauncher, args=(self.cmdline, os.path.join(BASEDIR, self.cwd), self.name, log_path))
if self.proc is not None:
return
cwd = os.path.join(BASEDIR, self.cwd)
cloudlog.info(f"starting process {self.name}")
self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name))
self.proc.start() self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
class PythonProcess(ManagerProcess): class PythonProcess(ManagerProcess):
def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None): def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None):
@@ -213,21 +197,14 @@ class PythonProcess(ManagerProcess):
cloudlog.info(f"preimporting {self.module}") cloudlog.info(f"preimporting {self.module}")
importlib.import_module(self.module) importlib.import_module(self.module)
def start(self) -> None: def start(self, log_path: str) -> None:
# In case we only tried a non blocking stop we need to stop it before restarting if self.shutting_down or self.proc is not None:
if self.shutting_down:
self.stop()
if self.proc is not None:
return return
self.proc = Process(name=self.name, target=modified_launcher, args=(self.module, self.name, log_path))
cloudlog.info(f"starting python {self.module}")
self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name))
self.proc.start() self.proc.start()
self.watchdog_seen = False self.watchdog_seen = False
self.shutting_down = False self.shutting_down = False
class DaemonProcess(ManagerProcess): class DaemonProcess(ManagerProcess):
"""Python process that has to stay running across manager restart. """Python process that has to stay running across manager restart.
This is used for athena so you don't lose SSH access when restarting manager.""" This is used for athena so you don't lose SSH access when restarting manager."""
@@ -245,50 +222,43 @@ class DaemonProcess(ManagerProcess):
def prepare(self) -> None: def prepare(self) -> None:
pass pass
def start(self) -> None: def start(self, log_path: str) -> None:
if self.params is None: if self.params is None:
self.params = Params() self.params = Params()
pid = self.params.get(self.param_name, encoding='utf-8') pid = self.params.get(self.param_name, encoding='utf-8')
if pid is not None: if pid is not None:
try: try:
os.kill(int(pid), 0) os.kill(int(pid), 0)
with open(f'/proc/{pid}/cmdline') as f: return # Process is already running
if self.module in f.read(): except OSError:
# daemon is running pass # Process not running, continue to start it
return
except (OSError, FileNotFoundError):
# process is dead
pass
cloudlog.info(f"starting daemon {self.name}") log_file_path = os.path.join(log_path, f"{self.name}.log")
proc = subprocess.Popen(['python', '-m', self.module], self.params.put(self.param_name, str(self.proc.pid))
stdin=open('/dev/null'), cloudlog.info(f"starting daemon {self.name}")
stdout=open('/dev/null', 'w'), self.proc = subprocess.Popen(['python', '-m', self.module],
stderr=open('/dev/null', 'w'), stdin=open('/dev/null'),
preexec_fn=os.setpgrp) stdout=open(log_file_path, 'a'),
stderr=subprocess.STDOUT,
self.params.put(self.param_name, str(proc.pid)) preexec_fn=os.setpgrp)
def stop(self, retry=True, block=True, sig=None) -> None: def stop(self, retry=True, block=True, sig=None) -> None:
pass pass
def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, not_run: list[str] | None=None, log_dir: str = None) -> list[ManagerProcess]:
if not_run is None:
not_run = []
def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, running = []
not_run: list[str] | None=None) -> list[ManagerProcess]: for p in procs:
if not_run is None: log_path = log_dir+"/"+p.name+".log"
not_run = [] if p.enabled and p.name not in not_run and p.should_run(started, params, CP):
p.start(log_path)
running.append(p)
else:
p.stop(block=False)
running = [] p.check_watchdog(started)
for p in procs:
if p.enabled and p.name not in not_run and p.should_run(started, params, CP):
running.append(p)
else:
p.stop(block=False)
p.check_watchdog(started) return running
for p in running:
p.start()
return running