This commit is contained in:
Your Name
2024-06-17 12:12:54 -05:00
parent 75c91213a2
commit a2cf02ea37
2 changed files with 365 additions and 71 deletions

View File

@@ -19,44 +19,50 @@ from openpilot.common.swaglog import cloudlog
WATCHDOG_FN = "/dev/shm/wd_"
ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None
ENABLE_WATCHDOG = False # Fixme
_log_dir = ""
def nativelauncher(pargs: list[str], cwd: str, name: str, log_path: str) -> None:
os.environ['MANAGER_DAEMON'] = name
with open(log_path, 'a') as log_file:
os.chdir(cwd)
proc = subprocess.Popen(pargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True)
log_file.write("Started "+name)
for line in proc.stdout:
print(line, end='')
log_file.write(line)
proc.wait()
def launcher(proc: str, name: str, log_path: str) -> None:
for _ in iter(int, 1):
try:
mod = importlib.import_module(proc)
setproctitle(proc)
messaging.context = messaging.Context()
cloudlog.bind(daemon=name)
sentry.set_tag("daemon", name)
with open(log_path, 'a') as log_file, subprocess.Popen(['python', '-m', proc], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) as proc:
log_file.write("Started "+name)
for line in proc.stdout:
print(line, end='')
log_file.write(line)
proc.wait()
except Exception as e:
print ("Fatal: "+name)
print (e)
sentry.capture_exception()
def launcher(proc: str, name: str) -> None:
try:
# import the process
mod = importlib.import_module(proc)
# rename the process
setproctitle(proc)
# create new context since we forked
messaging.context = messaging.Context()
# add daemon name tag to logs
cloudlog.bind(daemon=name)
sentry.set_tag("daemon", name)
# exec the process
mod.main()
except KeyboardInterrupt:
cloudlog.warning(f"child {proc} got SIGINT")
except Exception:
# can't install the crash handler because sys.excepthook doesn't play nice
# with threads, so catch it here.
sentry.capture_exception()
raise
def nativelauncher(pargs: list[str], cwd: str, name: str) -> None:
os.environ['MANAGER_DAEMON'] = name
# exec the process
os.chdir(cwd)
os.execvp(pargs[0], pargs)
def join_process(process: Process, timeout: float) -> None:
# Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382
# We have to poll the exitcode instead
t = time.monotonic()
while time.monotonic() - t < timeout and process.exitcode is None:
time.sleep(0.001)
class ManagerProcess(ABC):
daemon = False
sigkill = False
@@ -64,6 +70,7 @@ class ManagerProcess(ABC):
proc: Process | None = None
enabled = True
name = ""
last_watchdog_time = 0
watchdog_max_dt: int | None = None
watchdog_seen = False
@@ -78,20 +85,23 @@ class ManagerProcess(ABC):
pass
def restart(self) -> None:
if self.proc is not None and self.proc.exitcode is not None:
self.stop(sig=signal.SIGKILL, block=False)
self.stop(sig=signal.SIGKILL)
self.start()
def check_watchdog(self, started: bool) -> None:
if self.watchdog_max_dt is None or self.proc is None:
return
try:
fn = WATCHDOG_FN + str(self.proc.pid)
with open(fn, "rb") as f:
# TODO: why can't pylint find struct.unpack?
self.last_watchdog_time = struct.unpack('Q', f.read())[0]
except Exception:
pass
dt = time.monotonic() - self.last_watchdog_time / 1e9
if dt > self.watchdog_max_dt:
if (self.watchdog_seen or self.always_watchdog and self.proc.exitcode is not None) and ENABLE_WATCHDOG:
cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})")
@@ -102,6 +112,7 @@ class ManagerProcess(ABC):
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None:
if self.proc is None:
return None
if self.proc.exitcode is None:
if not self.shutting_down:
cloudlog.info(f"killing {self.name}")
@@ -109,23 +120,39 @@ class ManagerProcess(ABC):
sig = signal.SIGKILL if self.sigkill else signal.SIGINT
self.signal(sig)
self.shutting_down = True
if not block:
return None
join_process(self.proc, 5)
# If process failed to die send SIGKILL
if self.proc.exitcode is None and retry:
cloudlog.info(f"killing {self.name} with SIGKILL")
self.signal(signal.SIGKILL)
self.proc.join()
ret = self.proc.exitcode
cloudlog.info(f"{self.name} is dead with {ret}")
if self.proc.exitcode is not None:
self.shutting_down = False
self.proc = None
return ret
def signal(self, sig: int) -> None:
if self.proc is None or self.proc.exitcode is not None or self.proc.pid is None:
if self.proc is None:
return
# Don't signal if already exited
if self.proc.exitcode is not None and self.proc.pid is not None:
return
# Can't signal if we don't have a pid
if self.proc.pid is None:
return
cloudlog.info(f"sending signal {sig} to {self.name}")
os.kill(self.proc.pid, sig)
@@ -156,12 +183,20 @@ class NativeProcess(ManagerProcess):
pass
def start(self) -> None:
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
if self.shutting_down or self.proc is not None:
return
self.proc = Process(target=nativelauncher, args=(self.cmdline, os.path.join(BASEDIR, self.cwd), self.name, log_path))
# In case we only tried a non blocking stop we need to stop it before restarting
if self.shutting_down:
self.stop()
if self.proc is not None:
return
cwd = os.path.join(BASEDIR, self.cwd)
cloudlog.info(f"starting process {self.name}")
self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name))
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
class PythonProcess(ManagerProcess):
def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None):
@@ -179,15 +214,20 @@ class PythonProcess(ManagerProcess):
importlib.import_module(self.module)
def start(self) -> None:
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
if self.shutting_down or self.proc is not None:
# In case we only tried a non blocking stop we need to stop it before restarting
if self.shutting_down:
self.stop()
if self.proc is not None:
return
self.proc = Process(name=self.name, target=launcher, args=(self.module, self.name, log_path))
cloudlog.info(f"starting python {self.module}")
self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name))
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
class DaemonProcess(ManagerProcess):
"""Python process that has to stay running across manager restart.
This is used for athena so you don't lose SSH access when restarting manager."""
@@ -206,46 +246,49 @@ class DaemonProcess(ManagerProcess):
pass
def start(self) -> None:
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
if self.params is None:
self.params = Params()
self.params = Params()
pid = self.params.get(self.param_name, encoding='utf-8')
if pid is not None:
try:
os.kill(int(pid), 0)
return # Process is already running
except OSError:
pass # Process not running, continue to start it
try:
os.kill(int(pid), 0)
with open(f'/proc/{pid}/cmdline') as f:
if self.module in f.read():
# daemon is running
return
except (OSError, FileNotFoundError):
# process is dead
pass
cloudlog.info(f"starting daemon {self.name}")
self.proc = subprocess.Popen(['python', '-m', self.module],
stdin=open('/dev/null'),
stdout=open(log_path, 'a'),
stderr=subprocess.STDOUT,
preexec_fn=os.setpgrp)
self.params.put(self.param_name, str(self.proc.pid))
proc = subprocess.Popen(['python', '-m', self.module],
stdin=open('/dev/null'),
stdout=open('/dev/null', 'w'),
stderr=open('/dev/null', 'w'),
preexec_fn=os.setpgrp)
self.params.put(self.param_name, str(proc.pid))
def stop(self, retry=True, block=True, sig=None) -> None:
pass
def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, not_run: list[str] | None=None, log_dir: str = None) -> list[ManagerProcess]:
global _log_dir
_log_dir = log_dir
if not_run is None:
not_run = []
def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None,
not_run: list[str] | None=None) -> list[ManagerProcess]:
if not_run is None:
not_run = []
running = []
for p in procs:
if p.enabled and p.name not in not_run and p.should_run(started, params, CP):
if p.proc is None or (hasattr(p.proc, 'exitcode') and p.proc.exitcode is not None):
p.start()
running.append(p)
else:
p.stop(block=False)
running = []
for p in procs:
if p.enabled and p.name not in not_run and p.should_run(started, params, CP):
running.append(p)
else:
p.stop(block=False)
p.check_watchdog(started)
p.check_watchdog(started)
return running
for p in running:
p.start()
return running

View File

@@ -0,0 +1,251 @@
import importlib
import os
import signal
import struct
import time
import subprocess
from collections.abc import Callable, ValuesView
from abc import ABC, abstractmethod
from multiprocessing import Process
from setproctitle import setproctitle
from cereal import car, log
import cereal.messaging as messaging
import openpilot.selfdrive.sentry as sentry
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
WATCHDOG_FN = "/dev/shm/wd_"
ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None
ENABLE_WATCHDOG = False # Fixme
_log_dir = ""
def nativelauncher(pargs: list[str], cwd: str, name: str, log_path: str) -> None:
os.environ['MANAGER_DAEMON'] = name
with open(log_path, 'a') as log_file:
os.chdir(cwd)
proc = subprocess.Popen(pargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True)
log_file.write("Started "+name)
for line in proc.stdout:
print(line, end='')
log_file.write(line)
proc.wait()
def launcher(proc: str, name: str, log_path: str) -> None:
for _ in iter(int, 1):
try:
mod = importlib.import_module(proc)
setproctitle(proc)
messaging.context = messaging.Context()
cloudlog.bind(daemon=name)
sentry.set_tag("daemon", name)
with open(log_path, 'a') as log_file, subprocess.Popen(['python', '-m', proc], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) as proc:
log_file.write("Started "+name)
for line in proc.stdout:
print(line, end='')
log_file.write(line)
proc.wait()
except Exception as e:
print ("Fatal: "+name)
print (e)
sentry.capture_exception()
def join_process(process: Process, timeout: float) -> None:
t = time.monotonic()
while time.monotonic() - t < timeout and process.exitcode is None:
time.sleep(0.001)
class ManagerProcess(ABC):
daemon = False
sigkill = False
should_run: Callable[[bool, Params, car.CarParams], bool]
proc: Process | None = None
enabled = True
name = ""
last_watchdog_time = 0
watchdog_max_dt: int | None = None
watchdog_seen = False
shutting_down = False
@abstractmethod
def prepare(self) -> None:
pass
@abstractmethod
def start(self) -> None:
pass
def restart(self) -> None:
if self.proc is not None and self.proc.exitcode is not None:
self.stop(sig=signal.SIGKILL, block=False)
self.start()
def check_watchdog(self, started: bool) -> None:
if self.watchdog_max_dt is None or self.proc is None:
return
try:
fn = WATCHDOG_FN + str(self.proc.pid)
with open(fn, "rb") as f:
self.last_watchdog_time = struct.unpack('Q', f.read())[0]
except Exception:
pass
dt = time.monotonic() - self.last_watchdog_time / 1e9
if dt > self.watchdog_max_dt:
if (self.watchdog_seen or self.always_watchdog and self.proc.exitcode is not None) and ENABLE_WATCHDOG:
cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})")
self.restart()
else:
self.watchdog_seen = True
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None:
if self.proc is None:
return None
if self.proc.exitcode is None:
if not self.shutting_down:
cloudlog.info(f"killing {self.name}")
if sig is None:
sig = signal.SIGKILL if self.sigkill else signal.SIGINT
self.signal(sig)
self.shutting_down = True
if not block:
return None
join_process(self.proc, 5)
if self.proc.exitcode is None and retry:
cloudlog.info(f"killing {self.name} with SIGKILL")
self.signal(signal.SIGKILL)
self.proc.join()
ret = self.proc.exitcode
cloudlog.info(f"{self.name} is dead with {ret}")
if self.proc.exitcode is not None:
self.shutting_down = False
self.proc = None
return ret
def signal(self, sig: int) -> None:
if self.proc is None or self.proc.exitcode is not None or self.proc.pid is None:
return
cloudlog.info(f"sending signal {sig} to {self.name}")
os.kill(self.proc.pid, sig)
def get_process_state_msg(self):
state = log.ManagerState.ProcessState.new_message()
state.name = self.name
if self.proc:
state.running = self.proc.is_alive()
state.shouldBeRunning = self.proc is not None and not self.shutting_down
state.pid = self.proc.pid or 0
state.exitCode = self.proc.exitcode or 0
return state
class NativeProcess(ManagerProcess):
def __init__(self, name, cwd, cmdline, should_run, enabled=True, sigkill=False, watchdog_max_dt=None, always_watchdog=False):
self.name = name
self.cwd = cwd
self.cmdline = cmdline
self.should_run = should_run
self.enabled = enabled
self.sigkill = sigkill
self.watchdog_max_dt = watchdog_max_dt
self.launcher = nativelauncher
self.always_watchdog = always_watchdog
def prepare(self) -> None:
pass
def start(self) -> None:
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
if self.shutting_down or self.proc is not None:
return
self.proc = Process(target=nativelauncher, args=(self.cmdline, os.path.join(BASEDIR, self.cwd), self.name, log_path))
self.proc.start()
class PythonProcess(ManagerProcess):
def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None):
self.name = name
self.module = module
self.should_run = should_run
self.enabled = enabled
self.sigkill = sigkill
self.watchdog_max_dt = watchdog_max_dt
self.launcher = launcher
def prepare(self) -> None:
if self.enabled:
cloudlog.info(f"preimporting {self.module}")
importlib.import_module(self.module)
def start(self) -> None:
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
if self.shutting_down or self.proc is not None:
return
self.proc = Process(name=self.name, target=launcher, args=(self.module, self.name, log_path))
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
class DaemonProcess(ManagerProcess):
"""Python process that has to stay running across manager restart.
This is used for athena so you don't lose SSH access when restarting manager."""
def __init__(self, name, module, param_name, enabled=True):
self.name = name
self.module = module
self.param_name = param_name
self.enabled = enabled
self.params = None
@staticmethod
def should_run(started, params, CP):
return True
def prepare(self) -> None:
pass
def start(self) -> None:
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
if self.params is None:
self.params = Params()
pid = self.params.get(self.param_name, encoding='utf-8')
if pid is not None:
try:
os.kill(int(pid), 0)
return # Process is already running
except OSError:
pass # Process not running, continue to start it
cloudlog.info(f"starting daemon {self.name}")
self.proc = subprocess.Popen(['python', '-m', self.module],
stdin=open('/dev/null'),
stdout=open(log_path, 'a'),
stderr=subprocess.STDOUT,
preexec_fn=os.setpgrp)
self.params.put(self.param_name, str(self.proc.pid))
def stop(self, retry=True, block=True, sig=None) -> None:
pass
def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, not_run: list[str] | None=None, log_dir: str = None) -> list[ManagerProcess]:
global _log_dir
_log_dir = log_dir
if not_run is None:
not_run = []
running = []
for p in procs:
if p.enabled and p.name not in not_run and p.should_run(started, params, CP):
if p.proc is None or (hasattr(p.proc, 'exitcode') and p.proc.exitcode is not None):
p.start()
running.append(p)
else:
p.stop(block=False)
p.check_watchdog(started)
return running