From a2cf02ea37b4f2e94bc7b572f0db258a71223776 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 17 Jun 2024 12:12:54 -0500 Subject: [PATCH] wip --- selfdrive/manager/process.py | 185 +++++++++++++--------- selfdrive/manager/process_wip.txt | 251 ++++++++++++++++++++++++++++++ 2 files changed, 365 insertions(+), 71 deletions(-) create mode 100644 selfdrive/manager/process_wip.txt diff --git a/selfdrive/manager/process.py b/selfdrive/manager/process.py index d17eaad..607d3d0 100755 --- a/selfdrive/manager/process.py +++ b/selfdrive/manager/process.py @@ -19,44 +19,50 @@ from openpilot.common.swaglog import cloudlog WATCHDOG_FN = "/dev/shm/wd_" ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None -ENABLE_WATCHDOG = False # Fixme -_log_dir = "" -def nativelauncher(pargs: list[str], cwd: str, name: str, log_path: str) -> None: - os.environ['MANAGER_DAEMON'] = name - with open(log_path, 'a') as log_file: - os.chdir(cwd) - proc = subprocess.Popen(pargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) - log_file.write("Started "+name) - for line in proc.stdout: - print(line, end='') - log_file.write(line) - proc.wait() -def launcher(proc: str, name: str, log_path: str) -> None: - for _ in iter(int, 1): - try: - mod = importlib.import_module(proc) - setproctitle(proc) - messaging.context = messaging.Context() - cloudlog.bind(daemon=name) - sentry.set_tag("daemon", name) - with open(log_path, 'a') as log_file, subprocess.Popen(['python', '-m', proc], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) as proc: - log_file.write("Started "+name) - for line in proc.stdout: - print(line, end='') - log_file.write(line) - proc.wait() - except Exception as e: - print ("Fatal: "+name) - print (e) - sentry.capture_exception() +def launcher(proc: str, name: str) -> None: + try: + # import the process + mod = importlib.import_module(proc) + + # rename the process + setproctitle(proc) + + # create new context since we forked + messaging.context = messaging.Context() + + # add daemon name tag to logs + cloudlog.bind(daemon=name) + sentry.set_tag("daemon", name) + + # exec the process + mod.main() + except KeyboardInterrupt: + cloudlog.warning(f"child {proc} got SIGINT") + except Exception: + # can't install the crash handler because sys.excepthook doesn't play nice + # with threads, so catch it here. + sentry.capture_exception() + raise + + +def nativelauncher(pargs: list[str], cwd: str, name: str) -> None: + os.environ['MANAGER_DAEMON'] = name + + # exec the process + os.chdir(cwd) + os.execvp(pargs[0], pargs) + def join_process(process: Process, timeout: float) -> None: + # Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382 + # We have to poll the exitcode instead t = time.monotonic() while time.monotonic() - t < timeout and process.exitcode is None: time.sleep(0.001) + class ManagerProcess(ABC): daemon = False sigkill = False @@ -64,6 +70,7 @@ class ManagerProcess(ABC): proc: Process | None = None enabled = True name = "" + last_watchdog_time = 0 watchdog_max_dt: int | None = None watchdog_seen = False @@ -78,20 +85,23 @@ class ManagerProcess(ABC): pass def restart(self) -> None: - if self.proc is not None and self.proc.exitcode is not None: - self.stop(sig=signal.SIGKILL, block=False) + self.stop(sig=signal.SIGKILL) self.start() def check_watchdog(self, started: bool) -> None: if self.watchdog_max_dt is None or self.proc is None: return + try: fn = WATCHDOG_FN + str(self.proc.pid) with open(fn, "rb") as f: + # TODO: why can't pylint find struct.unpack? self.last_watchdog_time = struct.unpack('Q', f.read())[0] except Exception: pass + dt = time.monotonic() - self.last_watchdog_time / 1e9 + if dt > self.watchdog_max_dt: if (self.watchdog_seen or self.always_watchdog and self.proc.exitcode is not None) and ENABLE_WATCHDOG: cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})") @@ -102,6 +112,7 @@ class ManagerProcess(ABC): def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None: if self.proc is None: return None + if self.proc.exitcode is None: if not self.shutting_down: cloudlog.info(f"killing {self.name}") @@ -109,23 +120,39 @@ class ManagerProcess(ABC): sig = signal.SIGKILL if self.sigkill else signal.SIGINT self.signal(sig) self.shutting_down = True + if not block: return None + join_process(self.proc, 5) + + # If process failed to die send SIGKILL if self.proc.exitcode is None and retry: cloudlog.info(f"killing {self.name} with SIGKILL") self.signal(signal.SIGKILL) self.proc.join() + ret = self.proc.exitcode cloudlog.info(f"{self.name} is dead with {ret}") + if self.proc.exitcode is not None: self.shutting_down = False self.proc = None + return ret def signal(self, sig: int) -> None: - if self.proc is None or self.proc.exitcode is not None or self.proc.pid is None: + if self.proc is None: return + + # Don't signal if already exited + if self.proc.exitcode is not None and self.proc.pid is not None: + return + + # Can't signal if we don't have a pid + if self.proc.pid is None: + return + cloudlog.info(f"sending signal {sig} to {self.name}") os.kill(self.proc.pid, sig) @@ -156,12 +183,20 @@ class NativeProcess(ManagerProcess): pass def start(self) -> None: - global _log_dir - log_path = _log_dir+"/"+self.name+".log" - if self.shutting_down or self.proc is not None: - return - self.proc = Process(target=nativelauncher, args=(self.cmdline, os.path.join(BASEDIR, self.cwd), self.name, log_path)) + # In case we only tried a non blocking stop we need to stop it before restarting + if self.shutting_down: + self.stop() + + if self.proc is not None: + return + + cwd = os.path.join(BASEDIR, self.cwd) + cloudlog.info(f"starting process {self.name}") + self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name)) self.proc.start() + self.watchdog_seen = False + self.shutting_down = False + class PythonProcess(ManagerProcess): def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None): @@ -179,15 +214,20 @@ class PythonProcess(ManagerProcess): importlib.import_module(self.module) def start(self) -> None: - global _log_dir - log_path = _log_dir+"/"+self.name+".log" - if self.shutting_down or self.proc is not None: + # In case we only tried a non blocking stop we need to stop it before restarting + if self.shutting_down: + self.stop() + + if self.proc is not None: return - self.proc = Process(name=self.name, target=launcher, args=(self.module, self.name, log_path)) + + cloudlog.info(f"starting python {self.module}") + self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name)) self.proc.start() self.watchdog_seen = False self.shutting_down = False + class DaemonProcess(ManagerProcess): """Python process that has to stay running across manager restart. This is used for athena so you don't lose SSH access when restarting manager.""" @@ -206,46 +246,49 @@ class DaemonProcess(ManagerProcess): pass def start(self) -> None: - global _log_dir - log_path = _log_dir+"/"+self.name+".log" if self.params is None: - self.params = Params() + self.params = Params() pid = self.params.get(self.param_name, encoding='utf-8') if pid is not None: - try: - os.kill(int(pid), 0) - return # Process is already running - except OSError: - pass # Process not running, continue to start it + try: + os.kill(int(pid), 0) + with open(f'/proc/{pid}/cmdline') as f: + if self.module in f.read(): + # daemon is running + return + except (OSError, FileNotFoundError): + # process is dead + pass cloudlog.info(f"starting daemon {self.name}") - self.proc = subprocess.Popen(['python', '-m', self.module], - stdin=open('/dev/null'), - stdout=open(log_path, 'a'), - stderr=subprocess.STDOUT, - preexec_fn=os.setpgrp) - self.params.put(self.param_name, str(self.proc.pid)) + proc = subprocess.Popen(['python', '-m', self.module], + stdin=open('/dev/null'), + stdout=open('/dev/null', 'w'), + stderr=open('/dev/null', 'w'), + preexec_fn=os.setpgrp) + + self.params.put(self.param_name, str(proc.pid)) def stop(self, retry=True, block=True, sig=None) -> None: pass -def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, not_run: list[str] | None=None, log_dir: str = None) -> list[ManagerProcess]: - global _log_dir - _log_dir = log_dir - if not_run is None: - not_run = [] +def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, + not_run: list[str] | None=None) -> list[ManagerProcess]: + if not_run is None: + not_run = [] - running = [] - for p in procs: - if p.enabled and p.name not in not_run and p.should_run(started, params, CP): - if p.proc is None or (hasattr(p.proc, 'exitcode') and p.proc.exitcode is not None): - p.start() - running.append(p) - else: - p.stop(block=False) + running = [] + for p in procs: + if p.enabled and p.name not in not_run and p.should_run(started, params, CP): + running.append(p) + else: + p.stop(block=False) - p.check_watchdog(started) + p.check_watchdog(started) - return running + for p in running: + p.start() + + return running diff --git a/selfdrive/manager/process_wip.txt b/selfdrive/manager/process_wip.txt new file mode 100644 index 0000000..a2f844f --- /dev/null +++ b/selfdrive/manager/process_wip.txt @@ -0,0 +1,251 @@ +import importlib +import os +import signal +import struct +import time +import subprocess +from collections.abc import Callable, ValuesView +from abc import ABC, abstractmethod +from multiprocessing import Process + +from setproctitle import setproctitle + +from cereal import car, log +import cereal.messaging as messaging +import openpilot.selfdrive.sentry as sentry +from openpilot.common.basedir import BASEDIR +from openpilot.common.params import Params +from openpilot.common.swaglog import cloudlog + +WATCHDOG_FN = "/dev/shm/wd_" +ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None +ENABLE_WATCHDOG = False # Fixme +_log_dir = "" + +def nativelauncher(pargs: list[str], cwd: str, name: str, log_path: str) -> None: + os.environ['MANAGER_DAEMON'] = name + with open(log_path, 'a') as log_file: + os.chdir(cwd) + proc = subprocess.Popen(pargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) + log_file.write("Started "+name) + for line in proc.stdout: + print(line, end='') + log_file.write(line) + proc.wait() + +def launcher(proc: str, name: str, log_path: str) -> None: + for _ in iter(int, 1): + try: + mod = importlib.import_module(proc) + setproctitle(proc) + messaging.context = messaging.Context() + cloudlog.bind(daemon=name) + sentry.set_tag("daemon", name) + with open(log_path, 'a') as log_file, subprocess.Popen(['python', '-m', proc], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) as proc: + log_file.write("Started "+name) + for line in proc.stdout: + print(line, end='') + log_file.write(line) + proc.wait() + except Exception as e: + print ("Fatal: "+name) + print (e) + sentry.capture_exception() + +def join_process(process: Process, timeout: float) -> None: + t = time.monotonic() + while time.monotonic() - t < timeout and process.exitcode is None: + time.sleep(0.001) + +class ManagerProcess(ABC): + daemon = False + sigkill = False + should_run: Callable[[bool, Params, car.CarParams], bool] + proc: Process | None = None + enabled = True + name = "" + last_watchdog_time = 0 + watchdog_max_dt: int | None = None + watchdog_seen = False + shutting_down = False + + @abstractmethod + def prepare(self) -> None: + pass + + @abstractmethod + def start(self) -> None: + pass + + def restart(self) -> None: + if self.proc is not None and self.proc.exitcode is not None: + self.stop(sig=signal.SIGKILL, block=False) + self.start() + + def check_watchdog(self, started: bool) -> None: + if self.watchdog_max_dt is None or self.proc is None: + return + try: + fn = WATCHDOG_FN + str(self.proc.pid) + with open(fn, "rb") as f: + self.last_watchdog_time = struct.unpack('Q', f.read())[0] + except Exception: + pass + dt = time.monotonic() - self.last_watchdog_time / 1e9 + if dt > self.watchdog_max_dt: + if (self.watchdog_seen or self.always_watchdog and self.proc.exitcode is not None) and ENABLE_WATCHDOG: + cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})") + self.restart() + else: + self.watchdog_seen = True + + def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None: + if self.proc is None: + return None + if self.proc.exitcode is None: + if not self.shutting_down: + cloudlog.info(f"killing {self.name}") + if sig is None: + sig = signal.SIGKILL if self.sigkill else signal.SIGINT + self.signal(sig) + self.shutting_down = True + if not block: + return None + join_process(self.proc, 5) + if self.proc.exitcode is None and retry: + cloudlog.info(f"killing {self.name} with SIGKILL") + self.signal(signal.SIGKILL) + self.proc.join() + ret = self.proc.exitcode + cloudlog.info(f"{self.name} is dead with {ret}") + if self.proc.exitcode is not None: + self.shutting_down = False + self.proc = None + return ret + + def signal(self, sig: int) -> None: + if self.proc is None or self.proc.exitcode is not None or self.proc.pid is None: + return + cloudlog.info(f"sending signal {sig} to {self.name}") + os.kill(self.proc.pid, sig) + + def get_process_state_msg(self): + state = log.ManagerState.ProcessState.new_message() + state.name = self.name + if self.proc: + state.running = self.proc.is_alive() + state.shouldBeRunning = self.proc is not None and not self.shutting_down + state.pid = self.proc.pid or 0 + state.exitCode = self.proc.exitcode or 0 + return state + + +class NativeProcess(ManagerProcess): + def __init__(self, name, cwd, cmdline, should_run, enabled=True, sigkill=False, watchdog_max_dt=None, always_watchdog=False): + self.name = name + self.cwd = cwd + self.cmdline = cmdline + self.should_run = should_run + self.enabled = enabled + self.sigkill = sigkill + self.watchdog_max_dt = watchdog_max_dt + self.launcher = nativelauncher + self.always_watchdog = always_watchdog + + def prepare(self) -> None: + pass + + def start(self) -> None: + global _log_dir + log_path = _log_dir+"/"+self.name+".log" + if self.shutting_down or self.proc is not None: + return + self.proc = Process(target=nativelauncher, args=(self.cmdline, os.path.join(BASEDIR, self.cwd), self.name, log_path)) + self.proc.start() + +class PythonProcess(ManagerProcess): + def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None): + self.name = name + self.module = module + self.should_run = should_run + self.enabled = enabled + self.sigkill = sigkill + self.watchdog_max_dt = watchdog_max_dt + self.launcher = launcher + + def prepare(self) -> None: + if self.enabled: + cloudlog.info(f"preimporting {self.module}") + importlib.import_module(self.module) + + def start(self) -> None: + global _log_dir + log_path = _log_dir+"/"+self.name+".log" + if self.shutting_down or self.proc is not None: + return + self.proc = Process(name=self.name, target=launcher, args=(self.module, self.name, log_path)) + self.proc.start() + self.watchdog_seen = False + self.shutting_down = False + +class DaemonProcess(ManagerProcess): + """Python process that has to stay running across manager restart. + This is used for athena so you don't lose SSH access when restarting manager.""" + def __init__(self, name, module, param_name, enabled=True): + self.name = name + self.module = module + self.param_name = param_name + self.enabled = enabled + self.params = None + + @staticmethod + def should_run(started, params, CP): + return True + + def prepare(self) -> None: + pass + + def start(self) -> None: + global _log_dir + log_path = _log_dir+"/"+self.name+".log" + if self.params is None: + self.params = Params() + + pid = self.params.get(self.param_name, encoding='utf-8') + if pid is not None: + try: + os.kill(int(pid), 0) + return # Process is already running + except OSError: + pass # Process not running, continue to start it + + cloudlog.info(f"starting daemon {self.name}") + self.proc = subprocess.Popen(['python', '-m', self.module], + stdin=open('/dev/null'), + stdout=open(log_path, 'a'), + stderr=subprocess.STDOUT, + preexec_fn=os.setpgrp) + self.params.put(self.param_name, str(self.proc.pid)) + + def stop(self, retry=True, block=True, sig=None) -> None: + pass + + +def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, not_run: list[str] | None=None, log_dir: str = None) -> list[ManagerProcess]: + global _log_dir + _log_dir = log_dir + if not_run is None: + not_run = [] + + running = [] + for p in procs: + if p.enabled and p.name not in not_run and p.should_run(started, params, CP): + if p.proc is None or (hasattr(p.proc, 'exitcode') and p.proc.exitcode is not None): + p.start() + running.append(p) + else: + p.stop(block=False) + + p.check_watchdog(started) + + return running