From 445138519dc9af9cb406848d53b497a3b6ed4932 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 17 Jun 2024 15:43:03 -0500 Subject: [PATCH] wip --- selfdrive/manager/process.py | 83 ++----- selfdrive/manager/process_wip2.txt | 350 +++++++++++++++++++++++++++++ 2 files changed, 371 insertions(+), 62 deletions(-) create mode 100644 selfdrive/manager/process_wip2.txt diff --git a/selfdrive/manager/process.py b/selfdrive/manager/process.py index f7fb086..2d3f0c4 100755 --- a/selfdrive/manager/process.py +++ b/selfdrive/manager/process.py @@ -25,81 +25,40 @@ timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") _log_dir = f"/data/log2/{timestamp}" os.makedirs(_log_dir, exist_ok=True) -# def launcher(proc: str, name: str, log_path: str) -> None: -# try: -# # import the process -# mod = importlib.import_module(proc) - -# global _log_dir -# log_path = os.path.join(_log_dir, f"{name}.log") - -# # rename the process -# setproctitle(proc) - -# # create new context since we forked -# messaging.context = messaging.Context() - -# # add daemon name tag to logs -# cloudlog.bind(daemon=name) -# sentry.set_tag("daemon", name) - -# # exec the process -# mod.main() -# except KeyboardInterrupt: -# cloudlog.warning(f"child {proc} got SIGINT") -# except Exception as e: -# # can't install the crash handler because sys.excepthook doesn't play nice -# # with threads, so catch it here. -# with open(log_path, 'a') as file: file.write(str(e)+"\n") -# sentry.capture_exception() -# raise def launcher(proc: str, name: str) -> None: try: - # Import the process module - mod = importlib.import_module(proc) + # import the process + mod = importlib.import_module(proc) - # Path for logging - global _log_dir - log_path = os.path.join(_log_dir, f"{name}.log") + # rename the process + setproctitle(proc) - # Rename the process - setproctitle(name) + # create new context since we forked + messaging.context = messaging.Context() - # Create new context since we forked - messaging.context = messaging.Context() - - # Add daemon name tag to logs - cloudlog.bind(daemon=name) - sentry.set_tag("daemon", name) - - # Command construction - command = f"bash -c 'python -m {proc} 2>&1 | tee {log_path}'" - - # Execute the command - subprocess.run(command, shell=True, executable='/bin/bash', cwd=os.path.dirname(mod.__file__)) + # add daemon name tag to logs + cloudlog.bind(daemon=name) + sentry.set_tag("daemon", name) + # exec the process + mod.main() except KeyboardInterrupt: - cloudlog.warning(f"child {proc} got SIGINT") - except Exception as e: - with open(log_path, 'a') as file: - file.write(str(e) + "\n") - sentry.capture_exception() - raise + cloudlog.warning(f"child {proc} got SIGINT") + except Exception: + # can't install the crash handler because sys.excepthook doesn't play nice + # with threads, so catch it here. + sentry.capture_exception() + raise + def nativelauncher(pargs: list[str], cwd: str, name: str) -> None: os.environ['MANAGER_DAEMON'] = name - - global _log_dir - log_path = os.path.join(_log_dir, f"{name}.log") - - # Command construction - command = f"bash -c \"{ ' '.join(pargs) } 2>&1 | tee {log_path}\"" - - # Execute the command in the specified directory - subprocess.run(command, shell=True, cwd=cwd, executable='/bin/bash') + # exec the process + os.chdir(cwd) + os.execvp(pargs[0], pargs) def join_process(process: Process, timeout: float) -> None: # Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382 diff --git a/selfdrive/manager/process_wip2.txt b/selfdrive/manager/process_wip2.txt new file mode 100644 index 0000000..7fa55e4 --- /dev/null +++ b/selfdrive/manager/process_wip2.txt @@ -0,0 +1,350 @@ +import importlib +import os +import signal +import struct +import datetime +import time +import subprocess +from collections.abc import Callable, ValuesView +from abc import ABC, abstractmethod +from multiprocessing import Process + +from setproctitle import setproctitle + +from cereal import car, log +import cereal.messaging as messaging +import openpilot.selfdrive.sentry as sentry +from openpilot.common.basedir import BASEDIR +from openpilot.common.params import Params +from openpilot.common.swaglog import cloudlog + +WATCHDOG_FN = "/dev/shm/wd_" +ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None + +timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") +_log_dir = f"/data/log2/{timestamp}" +os.makedirs(_log_dir, exist_ok=True) + +# def launcher(proc: str, name: str, log_path: str) -> None: +# try: +# # import the process +# mod = importlib.import_module(proc) + +# global _log_dir +# log_path = os.path.join(_log_dir, f"{name}.log") + +# # rename the process +# setproctitle(proc) + +# # create new context since we forked +# messaging.context = messaging.Context() + +# # add daemon name tag to logs +# cloudlog.bind(daemon=name) +# sentry.set_tag("daemon", name) + +# # exec the process +# mod.main() +# except KeyboardInterrupt: +# cloudlog.warning(f"child {proc} got SIGINT") +# except Exception as e: +# # can't install the crash handler because sys.excepthook doesn't play nice +# # with threads, so catch it here. +# with open(log_path, 'a') as file: file.write(str(e)+"\n") +# sentry.capture_exception() +# raise + + +def launcher(proc: str, name: str) -> None: + try: + # Import the process module + mod = importlib.import_module(proc) + + # Path for logging + global _log_dir + log_path = os.path.join(_log_dir, f"{name}.log") + + # Rename the process + setproctitle(name) + + # Create new context since we forked + messaging.context = messaging.Context() + + # Add daemon name tag to logs + cloudlog.bind(daemon=name) + sentry.set_tag("daemon", name) + + # Command construction + command = f"bash -c 'python -m {proc} 2>&1 | tee {log_path}'" + + # Execute the command + subprocess.run(command, shell=True, executable='/bin/bash', cwd=os.path.dirname(mod.__file__)) + + except KeyboardInterrupt: + cloudlog.warning(f"child {proc} got SIGINT") + except Exception as e: + with open(log_path, 'a') as file: + file.write(str(e) + "\n") + sentry.capture_exception() + raise + +def nativelauncher(pargs: list[str], cwd: str, name: str) -> None: + os.environ['MANAGER_DAEMON'] = name + + global _log_dir + log_path = os.path.join(_log_dir, f"{name}.log") + + # Command construction + command = f"bash -c \"{ ' '.join(pargs) } 2>&1 | tee {log_path}\"" + + # Execute the command in the specified directory + subprocess.run(command, shell=True, cwd=cwd, executable='/bin/bash') + + +def join_process(process: Process, timeout: float) -> None: + # Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382 + # We have to poll the exitcode instead + t = time.monotonic() + while time.monotonic() - t < timeout and process.exitcode is None: + time.sleep(0.001) + + +class ManagerProcess(ABC): + daemon = False + sigkill = False + should_run: Callable[[bool, Params, car.CarParams], bool] + proc: Process | None = None + enabled = True + name = "" + + last_watchdog_time = 0 + watchdog_max_dt: int | None = None + watchdog_seen = False + shutting_down = False + + @abstractmethod + def prepare(self) -> None: + pass + + @abstractmethod + def start(self) -> None: + pass + + def restart(self) -> None: + self.stop(sig=signal.SIGKILL) + self.start() + + def check_watchdog(self, started: bool) -> None: + if self.watchdog_max_dt is None or self.proc is None: + return + + try: + fn = WATCHDOG_FN + str(self.proc.pid) + with open(fn, "rb") as f: + # TODO: why can't pylint find struct.unpack? + self.last_watchdog_time = struct.unpack('Q', f.read())[0] + except Exception: + pass + + dt = time.monotonic() - self.last_watchdog_time / 1e9 + + if dt > self.watchdog_max_dt: + if (self.watchdog_seen or self.always_watchdog and self.proc.exitcode is not None) and ENABLE_WATCHDOG: + cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})") + self.restart() + else: + self.watchdog_seen = True + + def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None: + if self.proc is None: + return None + + if self.proc.exitcode is None: + if not self.shutting_down: + cloudlog.info(f"killing {self.name}") + if sig is None: + sig = signal.SIGKILL if self.sigkill else signal.SIGINT + self.signal(sig) + self.shutting_down = True + + if not block: + return None + + join_process(self.proc, 5) + + # If process failed to die send SIGKILL + if self.proc.exitcode is None and retry: + cloudlog.info(f"killing {self.name} with SIGKILL") + self.signal(signal.SIGKILL) + self.proc.join() + + ret = self.proc.exitcode + cloudlog.info(f"{self.name} is dead with {ret}") + + if self.proc.exitcode is not None: + self.shutting_down = False + self.proc = None + + return ret + + def signal(self, sig: int) -> None: + if self.proc is None: + return + + # Don't signal if already exited + if self.proc.exitcode is not None and self.proc.pid is not None: + return + + # Can't signal if we don't have a pid + if self.proc.pid is None: + return + + cloudlog.info(f"sending signal {sig} to {self.name}") + os.kill(self.proc.pid, sig) + + def get_process_state_msg(self): + state = log.ManagerState.ProcessState.new_message() + state.name = self.name + if self.proc: + state.running = self.proc.is_alive() + state.shouldBeRunning = self.proc is not None and not self.shutting_down + state.pid = self.proc.pid or 0 + state.exitCode = self.proc.exitcode or 0 + return state + + +class NativeProcess(ManagerProcess): + def __init__(self, name, cwd, cmdline, should_run, enabled=True, sigkill=False, watchdog_max_dt=None, always_watchdog=False): + self.name = name + self.cwd = cwd + self.cmdline = cmdline + self.should_run = should_run + self.enabled = enabled + self.sigkill = sigkill + self.watchdog_max_dt = watchdog_max_dt + self.launcher = nativelauncher + self.always_watchdog = always_watchdog + + def prepare(self) -> None: + pass + + def start(self) -> None: + # In case we only tried a non blocking stop we need to stop it before restarting + if self.shutting_down: + self.stop() + + if self.proc is not None: + return + + global _log_dir + log_path = _log_dir+"/"+self.name+".log" + + cwd = os.path.join(BASEDIR, self.cwd) + cloudlog.info(f"starting process {self.name}") + self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name)) + self.proc.start() + self.watchdog_seen = False + self.shutting_down = False + + +class PythonProcess(ManagerProcess): + def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None): + self.name = name + self.module = module + self.should_run = should_run + self.enabled = enabled + self.sigkill = sigkill + self.watchdog_max_dt = watchdog_max_dt + self.launcher = launcher + + def prepare(self) -> None: + if self.enabled: + cloudlog.info(f"preimporting {self.module}") + importlib.import_module(self.module) + + def start(self) -> None: + # In case we only tried a non blocking stop we need to stop it before restarting + if self.shutting_down: + self.stop() + + if self.proc is not None: + return + + global _log_dir + log_path = _log_dir+"/"+self.name+".log" + cloudlog.info(f"starting python {self.module}") + self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name)) + self.proc.start() + self.watchdog_seen = False + self.shutting_down = False + + +class DaemonProcess(ManagerProcess): + """Python process that has to stay running across manager restart. + This is used for athena so you don't lose SSH access when restarting manager.""" + def __init__(self, name, module, param_name, enabled=True): + self.name = name + self.module = module + self.param_name = param_name + self.enabled = enabled + self.params = None + + + @staticmethod + def should_run(started, params, CP): + return True + + def prepare(self) -> None: + pass + + def start(self) -> None: + if self.params is None: + self.params = Params() + + global _log_dir + log_path = _log_dir+"/"+self.name+".log" + + pid = self.params.get(self.param_name, encoding='utf-8') + if pid is not None: + try: + os.kill(int(pid), 0) + with open(f'/proc/{pid}/cmdline') as f: + if self.module in f.read(): + # daemon is running + return + except (OSError, FileNotFoundError): + # process is dead + pass + + cloudlog.info(f"starting daemon {self.name}") + proc = subprocess.Popen(['python', '-m', self.module], + stdin=open('/dev/null'), + stdout=open(log_path, 'a'), + stderr=subprocess.STDOUT, + preexec_fn=os.setpgrp) + + self.params.put(self.param_name, str(proc.pid)) + + def stop(self, retry=True, block=True, sig=None) -> None: + pass + + +def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, + not_run: list[str] | None=None) -> list[ManagerProcess]: + + if not_run is None: + not_run = [] + + running = [] + for p in procs: + if p.enabled and p.name not in not_run and p.should_run(started, params, CP): + running.append(p) + else: + p.stop(block=False) + + p.check_watchdog(started) + + for p in running: + p.start() + + return running