From 720c9ccbc891e053dcd32b563a9bea52e6b6f103 Mon Sep 17 00:00:00 2001 From: concordia Date: Sat, 15 Jun 2024 20:20:54 -0500 Subject: [PATCH] wip --- selfdrive/manager/manager.py | 12 ++- selfdrive/manager/process.py | 158 ++++++++++++++--------------------- 2 files changed, 73 insertions(+), 97 deletions(-) diff --git a/selfdrive/manager/manager.py b/selfdrive/manager/manager.py index d7f736d..a704d3e 100755 --- a/selfdrive/manager/manager.py +++ b/selfdrive/manager/manager.py @@ -52,6 +52,10 @@ def frogpilot_boot_functions(frogpilot_functions): print(f"An unexpected error occurred: {e}") def manager_init(frogpilot_functions) -> None: + timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + log_dir = f"/data/log2/{timestamp}" + os.makedirs(log_dir, exist_ok=True) + frogpilot_boot = threading.Thread(target=frogpilot_boot_functions, args=(frogpilot_functions,)) frogpilot_boot.start() @@ -354,6 +358,8 @@ def manager_init(frogpilot_functions) -> None: for p in managed_processes.values(): p.prepare() + return log_dir + def manager_cleanup() -> None: # send signals to kill all procs @@ -368,7 +374,7 @@ def manager_cleanup() -> None: last_running = "" -def manager_thread(frogpilot_functions) -> None: +def manager_thread(frogpilot_functions, log_dir) -> None: global last_running cloudlog.bind(daemon="manager") @@ -389,7 +395,7 @@ def manager_thread(frogpilot_functions) -> None: pm = messaging.PubMaster(['managerState']) write_onroad_params(False, params) - ensure_running(managed_processes.values(), False, params=params, CP=sm['carParams'], not_run=ignore) + ensure_running(managed_processes.values(), False, params=params, CP=sm['carParams'], not_run=ignore, log_dir=log_dir) started_prev = False @@ -418,7 +424,7 @@ def manager_thread(frogpilot_functions) -> None: started_prev = started - ensure_running(managed_processes.values(), started, params=params, CP=sm['carParams'], not_run=ignore) + ensure_running(managed_processes.values(), started, params=params, CP=sm['carParams'], not_run=ignore, log_dir=log_dir) running = ' '.join("{}{}\u001b[0m".format("\u001b[32m" if p.proc.is_alive() else "\u001b[31m", p.name) for p in managed_processes.values() if p.proc) diff --git a/selfdrive/manager/process.py b/selfdrive/manager/process.py index 607d3d0..cda3922 100755 --- a/selfdrive/manager/process.py +++ b/selfdrive/manager/process.py @@ -20,39 +20,33 @@ from openpilot.common.swaglog import cloudlog WATCHDOG_FN = "/dev/shm/wd_" ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None +def modified_nativelauncher(pargs: list[str], cwd: str, name: str, log_path: str) -> None: + os.environ['MANAGER_DAEMON'] = name + log_file_path = os.path.join(log_path, f"{name}.log") + with open(log_file_path, 'a') as log_file: + os.chdir(cwd) + proc = subprocess.Popen(pargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) + for line in proc.stdout: + print(line, end='') + log_file.write(line) + proc.wait() -def launcher(proc: str, name: str) -> None: - try: - # import the process - mod = importlib.import_module(proc) - - # rename the process - setproctitle(proc) - - # create new context since we forked - messaging.context = messaging.Context() - - # add daemon name tag to logs - cloudlog.bind(daemon=name) - sentry.set_tag("daemon", name) - - # exec the process - mod.main() - except KeyboardInterrupt: - cloudlog.warning(f"child {proc} got SIGINT") - except Exception: - # can't install the crash handler because sys.excepthook doesn't play nice - # with threads, so catch it here. - sentry.capture_exception() - raise - - -def nativelauncher(pargs: list[str], cwd: str, name: str) -> None: - os.environ['MANAGER_DAEMON'] = name - - # exec the process - os.chdir(cwd) - os.execvp(pargs[0], pargs) +def modified_launcher(proc: str, name: str, log_path: str) -> None: + try: + mod = importlib.import_module(proc) + setproctitle(proc) + messaging.context = messaging.Context() + cloudlog.bind(daemon=name) + sentry.set_tag("daemon", name) + log_file_path = os.path.join(log_path, f"{name}.log") + with open(log_file_path, 'a') as log_file, subprocess.Popen(['python', '-m', proc], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) as proc: + for line in proc.stdout: + print(line, end='') + log_file.write(line) + proc.wait() + except Exception: + sentry.capture_exception() + raise def join_process(process: Process, timeout: float) -> None: @@ -182,21 +176,11 @@ class NativeProcess(ManagerProcess): def prepare(self) -> None: pass - def start(self) -> None: - # In case we only tried a non blocking stop we need to stop it before restarting - if self.shutting_down: - self.stop() - - if self.proc is not None: - return - - cwd = os.path.join(BASEDIR, self.cwd) - cloudlog.info(f"starting process {self.name}") - self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name)) + def start(self, log_path: str) -> None: + if self.shutting_down or self.proc is not None: + return + self.proc = Process(target=modified_nativelauncher, args=(self.cmdline, os.path.join(BASEDIR, self.cwd), self.name, log_path)) self.proc.start() - self.watchdog_seen = False - self.shutting_down = False - class PythonProcess(ManagerProcess): def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None): @@ -213,21 +197,14 @@ class PythonProcess(ManagerProcess): cloudlog.info(f"preimporting {self.module}") importlib.import_module(self.module) - def start(self) -> None: - # In case we only tried a non blocking stop we need to stop it before restarting - if self.shutting_down: - self.stop() - - if self.proc is not None: + def start(self, log_path: str) -> None: + if self.shutting_down or self.proc is not None: return - - cloudlog.info(f"starting python {self.module}") - self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name)) + self.proc = Process(name=self.name, target=modified_launcher, args=(self.module, self.name, log_path)) self.proc.start() self.watchdog_seen = False self.shutting_down = False - class DaemonProcess(ManagerProcess): """Python process that has to stay running across manager restart. This is used for athena so you don't lose SSH access when restarting manager.""" @@ -245,50 +222,43 @@ class DaemonProcess(ManagerProcess): def prepare(self) -> None: pass - def start(self) -> None: - if self.params is None: - self.params = Params() + def start(self, log_path: str) -> None: + if self.params is None: + self.params = Params() - pid = self.params.get(self.param_name, encoding='utf-8') - if pid is not None: - try: - os.kill(int(pid), 0) - with open(f'/proc/{pid}/cmdline') as f: - if self.module in f.read(): - # daemon is running - return - except (OSError, FileNotFoundError): - # process is dead - pass + pid = self.params.get(self.param_name, encoding='utf-8') + if pid is not None: + try: + os.kill(int(pid), 0) + return # Process is already running + except OSError: + pass # Process not running, continue to start it - cloudlog.info(f"starting daemon {self.name}") - proc = subprocess.Popen(['python', '-m', self.module], - stdin=open('/dev/null'), - stdout=open('/dev/null', 'w'), - stderr=open('/dev/null', 'w'), - preexec_fn=os.setpgrp) - - self.params.put(self.param_name, str(proc.pid)) + log_file_path = os.path.join(log_path, f"{self.name}.log") + self.params.put(self.param_name, str(self.proc.pid)) + cloudlog.info(f"starting daemon {self.name}") + self.proc = subprocess.Popen(['python', '-m', self.module], + stdin=open('/dev/null'), + stdout=open(log_file_path, 'a'), + stderr=subprocess.STDOUT, + preexec_fn=os.setpgrp) def stop(self, retry=True, block=True, sig=None) -> None: pass +def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, not_run: list[str] | None=None, log_dir: str = None) -> list[ManagerProcess]: + if not_run is None: + not_run = [] -def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, - not_run: list[str] | None=None) -> list[ManagerProcess]: - if not_run is None: - not_run = [] + running = [] + for p in procs: + log_path = log_dir+"/"+p.name+".log" + if p.enabled and p.name not in not_run and p.should_run(started, params, CP): + p.start(log_path) + running.append(p) + else: + p.stop(block=False) - running = [] - for p in procs: - if p.enabled and p.name not in not_run and p.should_run(started, params, CP): - running.append(p) - else: - p.stop(block=False) + p.check_watchdog(started) - p.check_watchdog(started) - - for p in running: - p.start() - - return running + return running