wip
This commit is contained in:
@@ -25,81 +25,40 @@ timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
|
||||
_log_dir = f"/data/log2/{timestamp}"
|
||||
os.makedirs(_log_dir, exist_ok=True)
|
||||
|
||||
# def launcher(proc: str, name: str, log_path: str) -> None:
|
||||
# try:
|
||||
# # import the process
|
||||
# mod = importlib.import_module(proc)
|
||||
|
||||
# global _log_dir
|
||||
# log_path = os.path.join(_log_dir, f"{name}.log")
|
||||
|
||||
# # rename the process
|
||||
# setproctitle(proc)
|
||||
|
||||
# # create new context since we forked
|
||||
# messaging.context = messaging.Context()
|
||||
|
||||
# # add daemon name tag to logs
|
||||
# cloudlog.bind(daemon=name)
|
||||
# sentry.set_tag("daemon", name)
|
||||
|
||||
# # exec the process
|
||||
# mod.main()
|
||||
# except KeyboardInterrupt:
|
||||
# cloudlog.warning(f"child {proc} got SIGINT")
|
||||
# except Exception as e:
|
||||
# # can't install the crash handler because sys.excepthook doesn't play nice
|
||||
# # with threads, so catch it here.
|
||||
# with open(log_path, 'a') as file: file.write(str(e)+"\n")
|
||||
# sentry.capture_exception()
|
||||
# raise
|
||||
|
||||
|
||||
def launcher(proc: str, name: str) -> None:
|
||||
try:
|
||||
# Import the process module
|
||||
mod = importlib.import_module(proc)
|
||||
# import the process
|
||||
mod = importlib.import_module(proc)
|
||||
|
||||
# Path for logging
|
||||
global _log_dir
|
||||
log_path = os.path.join(_log_dir, f"{name}.log")
|
||||
# rename the process
|
||||
setproctitle(proc)
|
||||
|
||||
# Rename the process
|
||||
setproctitle(name)
|
||||
# create new context since we forked
|
||||
messaging.context = messaging.Context()
|
||||
|
||||
# Create new context since we forked
|
||||
messaging.context = messaging.Context()
|
||||
|
||||
# Add daemon name tag to logs
|
||||
cloudlog.bind(daemon=name)
|
||||
sentry.set_tag("daemon", name)
|
||||
|
||||
# Command construction
|
||||
command = f"bash -c 'python -m {proc} 2>&1 | tee {log_path}'"
|
||||
|
||||
# Execute the command
|
||||
subprocess.run(command, shell=True, executable='/bin/bash', cwd=os.path.dirname(mod.__file__))
|
||||
# add daemon name tag to logs
|
||||
cloudlog.bind(daemon=name)
|
||||
sentry.set_tag("daemon", name)
|
||||
|
||||
# exec the process
|
||||
mod.main()
|
||||
except KeyboardInterrupt:
|
||||
cloudlog.warning(f"child {proc} got SIGINT")
|
||||
except Exception as e:
|
||||
with open(log_path, 'a') as file:
|
||||
file.write(str(e) + "\n")
|
||||
sentry.capture_exception()
|
||||
raise
|
||||
cloudlog.warning(f"child {proc} got SIGINT")
|
||||
except Exception:
|
||||
# can't install the crash handler because sys.excepthook doesn't play nice
|
||||
# with threads, so catch it here.
|
||||
sentry.capture_exception()
|
||||
raise
|
||||
|
||||
|
||||
def nativelauncher(pargs: list[str], cwd: str, name: str) -> None:
|
||||
os.environ['MANAGER_DAEMON'] = name
|
||||
|
||||
global _log_dir
|
||||
log_path = os.path.join(_log_dir, f"{name}.log")
|
||||
|
||||
# Command construction
|
||||
command = f"bash -c \"{ ' '.join(pargs) } 2>&1 | tee {log_path}\""
|
||||
|
||||
# Execute the command in the specified directory
|
||||
subprocess.run(command, shell=True, cwd=cwd, executable='/bin/bash')
|
||||
|
||||
# exec the process
|
||||
os.chdir(cwd)
|
||||
os.execvp(pargs[0], pargs)
|
||||
|
||||
def join_process(process: Process, timeout: float) -> None:
|
||||
# Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382
|
||||
|
||||
350
selfdrive/manager/process_wip2.txt
Normal file
350
selfdrive/manager/process_wip2.txt
Normal file
@@ -0,0 +1,350 @@
|
||||
import importlib
|
||||
import os
|
||||
import signal
|
||||
import struct
|
||||
import datetime
|
||||
import time
|
||||
import subprocess
|
||||
from collections.abc import Callable, ValuesView
|
||||
from abc import ABC, abstractmethod
|
||||
from multiprocessing import Process
|
||||
|
||||
from setproctitle import setproctitle
|
||||
|
||||
from cereal import car, log
|
||||
import cereal.messaging as messaging
|
||||
import openpilot.selfdrive.sentry as sentry
|
||||
from openpilot.common.basedir import BASEDIR
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.common.swaglog import cloudlog
|
||||
|
||||
WATCHDOG_FN = "/dev/shm/wd_"
|
||||
ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None
|
||||
|
||||
timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
|
||||
_log_dir = f"/data/log2/{timestamp}"
|
||||
os.makedirs(_log_dir, exist_ok=True)
|
||||
|
||||
# def launcher(proc: str, name: str, log_path: str) -> None:
|
||||
# try:
|
||||
# # import the process
|
||||
# mod = importlib.import_module(proc)
|
||||
|
||||
# global _log_dir
|
||||
# log_path = os.path.join(_log_dir, f"{name}.log")
|
||||
|
||||
# # rename the process
|
||||
# setproctitle(proc)
|
||||
|
||||
# # create new context since we forked
|
||||
# messaging.context = messaging.Context()
|
||||
|
||||
# # add daemon name tag to logs
|
||||
# cloudlog.bind(daemon=name)
|
||||
# sentry.set_tag("daemon", name)
|
||||
|
||||
# # exec the process
|
||||
# mod.main()
|
||||
# except KeyboardInterrupt:
|
||||
# cloudlog.warning(f"child {proc} got SIGINT")
|
||||
# except Exception as e:
|
||||
# # can't install the crash handler because sys.excepthook doesn't play nice
|
||||
# # with threads, so catch it here.
|
||||
# with open(log_path, 'a') as file: file.write(str(e)+"\n")
|
||||
# sentry.capture_exception()
|
||||
# raise
|
||||
|
||||
|
||||
def launcher(proc: str, name: str) -> None:
|
||||
try:
|
||||
# Import the process module
|
||||
mod = importlib.import_module(proc)
|
||||
|
||||
# Path for logging
|
||||
global _log_dir
|
||||
log_path = os.path.join(_log_dir, f"{name}.log")
|
||||
|
||||
# Rename the process
|
||||
setproctitle(name)
|
||||
|
||||
# Create new context since we forked
|
||||
messaging.context = messaging.Context()
|
||||
|
||||
# Add daemon name tag to logs
|
||||
cloudlog.bind(daemon=name)
|
||||
sentry.set_tag("daemon", name)
|
||||
|
||||
# Command construction
|
||||
command = f"bash -c 'python -m {proc} 2>&1 | tee {log_path}'"
|
||||
|
||||
# Execute the command
|
||||
subprocess.run(command, shell=True, executable='/bin/bash', cwd=os.path.dirname(mod.__file__))
|
||||
|
||||
except KeyboardInterrupt:
|
||||
cloudlog.warning(f"child {proc} got SIGINT")
|
||||
except Exception as e:
|
||||
with open(log_path, 'a') as file:
|
||||
file.write(str(e) + "\n")
|
||||
sentry.capture_exception()
|
||||
raise
|
||||
|
||||
def nativelauncher(pargs: list[str], cwd: str, name: str) -> None:
|
||||
os.environ['MANAGER_DAEMON'] = name
|
||||
|
||||
global _log_dir
|
||||
log_path = os.path.join(_log_dir, f"{name}.log")
|
||||
|
||||
# Command construction
|
||||
command = f"bash -c \"{ ' '.join(pargs) } 2>&1 | tee {log_path}\""
|
||||
|
||||
# Execute the command in the specified directory
|
||||
subprocess.run(command, shell=True, cwd=cwd, executable='/bin/bash')
|
||||
|
||||
|
||||
def join_process(process: Process, timeout: float) -> None:
|
||||
# Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382
|
||||
# We have to poll the exitcode instead
|
||||
t = time.monotonic()
|
||||
while time.monotonic() - t < timeout and process.exitcode is None:
|
||||
time.sleep(0.001)
|
||||
|
||||
|
||||
class ManagerProcess(ABC):
|
||||
daemon = False
|
||||
sigkill = False
|
||||
should_run: Callable[[bool, Params, car.CarParams], bool]
|
||||
proc: Process | None = None
|
||||
enabled = True
|
||||
name = ""
|
||||
|
||||
last_watchdog_time = 0
|
||||
watchdog_max_dt: int | None = None
|
||||
watchdog_seen = False
|
||||
shutting_down = False
|
||||
|
||||
@abstractmethod
|
||||
def prepare(self) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start(self) -> None:
|
||||
pass
|
||||
|
||||
def restart(self) -> None:
|
||||
self.stop(sig=signal.SIGKILL)
|
||||
self.start()
|
||||
|
||||
def check_watchdog(self, started: bool) -> None:
|
||||
if self.watchdog_max_dt is None or self.proc is None:
|
||||
return
|
||||
|
||||
try:
|
||||
fn = WATCHDOG_FN + str(self.proc.pid)
|
||||
with open(fn, "rb") as f:
|
||||
# TODO: why can't pylint find struct.unpack?
|
||||
self.last_watchdog_time = struct.unpack('Q', f.read())[0]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
dt = time.monotonic() - self.last_watchdog_time / 1e9
|
||||
|
||||
if dt > self.watchdog_max_dt:
|
||||
if (self.watchdog_seen or self.always_watchdog and self.proc.exitcode is not None) and ENABLE_WATCHDOG:
|
||||
cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})")
|
||||
self.restart()
|
||||
else:
|
||||
self.watchdog_seen = True
|
||||
|
||||
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None:
|
||||
if self.proc is None:
|
||||
return None
|
||||
|
||||
if self.proc.exitcode is None:
|
||||
if not self.shutting_down:
|
||||
cloudlog.info(f"killing {self.name}")
|
||||
if sig is None:
|
||||
sig = signal.SIGKILL if self.sigkill else signal.SIGINT
|
||||
self.signal(sig)
|
||||
self.shutting_down = True
|
||||
|
||||
if not block:
|
||||
return None
|
||||
|
||||
join_process(self.proc, 5)
|
||||
|
||||
# If process failed to die send SIGKILL
|
||||
if self.proc.exitcode is None and retry:
|
||||
cloudlog.info(f"killing {self.name} with SIGKILL")
|
||||
self.signal(signal.SIGKILL)
|
||||
self.proc.join()
|
||||
|
||||
ret = self.proc.exitcode
|
||||
cloudlog.info(f"{self.name} is dead with {ret}")
|
||||
|
||||
if self.proc.exitcode is not None:
|
||||
self.shutting_down = False
|
||||
self.proc = None
|
||||
|
||||
return ret
|
||||
|
||||
def signal(self, sig: int) -> None:
|
||||
if self.proc is None:
|
||||
return
|
||||
|
||||
# Don't signal if already exited
|
||||
if self.proc.exitcode is not None and self.proc.pid is not None:
|
||||
return
|
||||
|
||||
# Can't signal if we don't have a pid
|
||||
if self.proc.pid is None:
|
||||
return
|
||||
|
||||
cloudlog.info(f"sending signal {sig} to {self.name}")
|
||||
os.kill(self.proc.pid, sig)
|
||||
|
||||
def get_process_state_msg(self):
|
||||
state = log.ManagerState.ProcessState.new_message()
|
||||
state.name = self.name
|
||||
if self.proc:
|
||||
state.running = self.proc.is_alive()
|
||||
state.shouldBeRunning = self.proc is not None and not self.shutting_down
|
||||
state.pid = self.proc.pid or 0
|
||||
state.exitCode = self.proc.exitcode or 0
|
||||
return state
|
||||
|
||||
|
||||
class NativeProcess(ManagerProcess):
|
||||
def __init__(self, name, cwd, cmdline, should_run, enabled=True, sigkill=False, watchdog_max_dt=None, always_watchdog=False):
|
||||
self.name = name
|
||||
self.cwd = cwd
|
||||
self.cmdline = cmdline
|
||||
self.should_run = should_run
|
||||
self.enabled = enabled
|
||||
self.sigkill = sigkill
|
||||
self.watchdog_max_dt = watchdog_max_dt
|
||||
self.launcher = nativelauncher
|
||||
self.always_watchdog = always_watchdog
|
||||
|
||||
def prepare(self) -> None:
|
||||
pass
|
||||
|
||||
def start(self) -> None:
|
||||
# In case we only tried a non blocking stop we need to stop it before restarting
|
||||
if self.shutting_down:
|
||||
self.stop()
|
||||
|
||||
if self.proc is not None:
|
||||
return
|
||||
|
||||
global _log_dir
|
||||
log_path = _log_dir+"/"+self.name+".log"
|
||||
|
||||
cwd = os.path.join(BASEDIR, self.cwd)
|
||||
cloudlog.info(f"starting process {self.name}")
|
||||
self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name))
|
||||
self.proc.start()
|
||||
self.watchdog_seen = False
|
||||
self.shutting_down = False
|
||||
|
||||
|
||||
class PythonProcess(ManagerProcess):
|
||||
def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None):
|
||||
self.name = name
|
||||
self.module = module
|
||||
self.should_run = should_run
|
||||
self.enabled = enabled
|
||||
self.sigkill = sigkill
|
||||
self.watchdog_max_dt = watchdog_max_dt
|
||||
self.launcher = launcher
|
||||
|
||||
def prepare(self) -> None:
|
||||
if self.enabled:
|
||||
cloudlog.info(f"preimporting {self.module}")
|
||||
importlib.import_module(self.module)
|
||||
|
||||
def start(self) -> None:
|
||||
# In case we only tried a non blocking stop we need to stop it before restarting
|
||||
if self.shutting_down:
|
||||
self.stop()
|
||||
|
||||
if self.proc is not None:
|
||||
return
|
||||
|
||||
global _log_dir
|
||||
log_path = _log_dir+"/"+self.name+".log"
|
||||
cloudlog.info(f"starting python {self.module}")
|
||||
self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name))
|
||||
self.proc.start()
|
||||
self.watchdog_seen = False
|
||||
self.shutting_down = False
|
||||
|
||||
|
||||
class DaemonProcess(ManagerProcess):
|
||||
"""Python process that has to stay running across manager restart.
|
||||
This is used for athena so you don't lose SSH access when restarting manager."""
|
||||
def __init__(self, name, module, param_name, enabled=True):
|
||||
self.name = name
|
||||
self.module = module
|
||||
self.param_name = param_name
|
||||
self.enabled = enabled
|
||||
self.params = None
|
||||
|
||||
|
||||
@staticmethod
|
||||
def should_run(started, params, CP):
|
||||
return True
|
||||
|
||||
def prepare(self) -> None:
|
||||
pass
|
||||
|
||||
def start(self) -> None:
|
||||
if self.params is None:
|
||||
self.params = Params()
|
||||
|
||||
global _log_dir
|
||||
log_path = _log_dir+"/"+self.name+".log"
|
||||
|
||||
pid = self.params.get(self.param_name, encoding='utf-8')
|
||||
if pid is not None:
|
||||
try:
|
||||
os.kill(int(pid), 0)
|
||||
with open(f'/proc/{pid}/cmdline') as f:
|
||||
if self.module in f.read():
|
||||
# daemon is running
|
||||
return
|
||||
except (OSError, FileNotFoundError):
|
||||
# process is dead
|
||||
pass
|
||||
|
||||
cloudlog.info(f"starting daemon {self.name}")
|
||||
proc = subprocess.Popen(['python', '-m', self.module],
|
||||
stdin=open('/dev/null'),
|
||||
stdout=open(log_path, 'a'),
|
||||
stderr=subprocess.STDOUT,
|
||||
preexec_fn=os.setpgrp)
|
||||
|
||||
self.params.put(self.param_name, str(proc.pid))
|
||||
|
||||
def stop(self, retry=True, block=True, sig=None) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None,
|
||||
not_run: list[str] | None=None) -> list[ManagerProcess]:
|
||||
|
||||
if not_run is None:
|
||||
not_run = []
|
||||
|
||||
running = []
|
||||
for p in procs:
|
||||
if p.enabled and p.name not in not_run and p.should_run(started, params, CP):
|
||||
running.append(p)
|
||||
else:
|
||||
p.stop(block=False)
|
||||
|
||||
p.check_watchdog(started)
|
||||
|
||||
for p in running:
|
||||
p.start()
|
||||
|
||||
return running
|
||||
Reference in New Issue
Block a user