Files
oscarpilot/selfdrive/thermald/power_monitoring.py
FrogAi e3bfe75ca2 Device shut down timer
Added toggle to shut down the device after being offroad for a specified amount of time.
2024-01-26 10:51:43 -07:00

166 lines
7.1 KiB
Python

import time
import threading
from datetime import datetime, timedelta
from typing import Optional
from openpilot.common.params import Params
from openpilot.system.hardware import HARDWARE
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.statsd import statlog
CAR_VOLTAGE_LOW_PASS_K = 0.011 # LPF gain for 45s tau (dt/tau / (dt/tau + 1))
# While driving, a battery charges completely in about 30-60 minutes
CAR_BATTERY_CAPACITY_uWh = 30e6
CAR_CHARGING_RATE_W = 45
VBATT_PAUSE_CHARGING = 11.8 # Lower limit on the LPF car battery voltage
VBATT_INSTANT_PAUSE_CHARGING = 7.0 # Lower limit on the instant car battery voltage measurements to avoid triggering on instant power loss
MAX_TIME_OFFROAD_S = 30*3600
MIN_ON_TIME_S = 3600
DELAY_SHUTDOWN_TIME_S = 300 # Wait at least DELAY_SHUTDOWN_TIME_S seconds after offroad_time to shutdown.
VOLTAGE_SHUTDOWN_MIN_OFFROAD_TIME_S = 60
class PowerMonitoring:
def __init__(self):
self.params = Params()
self.last_measurement_time = None # Used for integration delta
self.last_save_time = 0 # Used for saving current value in a param
self.power_used_uWh = 0 # Integrated power usage in uWh since going into offroad
self.next_pulsed_measurement_time = None
self.car_voltage_mV = 12e3 # Low-passed version of peripheralState voltage
self.car_voltage_instant_mV = 12e3 # Last value of peripheralState voltage
self.integration_lock = threading.Lock()
car_battery_capacity_uWh = self.params.get("CarBatteryCapacity")
if car_battery_capacity_uWh is None:
car_battery_capacity_uWh = 0
# Reset capacity if it's low
self.car_battery_capacity_uWh = max((CAR_BATTERY_CAPACITY_uWh / 10), int(car_battery_capacity_uWh))
# FrogPilot variables
device_shutdown_setting = self.params.get_int("DeviceShutdown")
# If the toggle is set for < 1 hour, configure by 15 minute increments
self.device_shutdown_time = (device_shutdown_setting - 3) * 3600 if device_shutdown_setting >= 4 else device_shutdown_setting * (60 * 15)
self.download_schedule = self.params.get_int("UpdateSchedule")
self.download_time = self.params.get_int("UpdateTime")
# Calculation tick
def calculate(self, voltage: Optional[int], ignition: bool):
try:
now = time.monotonic()
# If peripheralState is None, we're probably not in a car, so we don't care
if voltage is None:
with self.integration_lock:
self.last_measurement_time = None
self.next_pulsed_measurement_time = None
self.power_used_uWh = 0
return
# Low-pass battery voltage
self.car_voltage_instant_mV = voltage
self.car_voltage_mV = ((voltage * CAR_VOLTAGE_LOW_PASS_K) + (self.car_voltage_mV * (1 - CAR_VOLTAGE_LOW_PASS_K)))
statlog.gauge("car_voltage", self.car_voltage_mV / 1e3)
# Cap the car battery power and save it in a param every 10-ish seconds
self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0)
self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh)
if now - self.last_save_time >= 10:
self.params.put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh)))
self.last_save_time = now
# First measurement, set integration time
with self.integration_lock:
if self.last_measurement_time is None:
self.last_measurement_time = now
return
if ignition:
# If there is ignition, we integrate the charging rate of the car
with self.integration_lock:
self.power_used_uWh = 0
integration_time_h = (now - self.last_measurement_time) / 3600
if integration_time_h < 0:
raise ValueError(f"Negative integration time: {integration_time_h}h")
self.car_battery_capacity_uWh += (CAR_CHARGING_RATE_W * 1e6 * integration_time_h)
self.last_measurement_time = now
else:
# Get current power draw somehow
current_power = HARDWARE.get_current_power_draw()
# Do the integration
self._perform_integration(now, current_power)
except Exception:
cloudlog.exception("Power monitoring calculation failed")
def _perform_integration(self, t: float, current_power: float) -> None:
with self.integration_lock:
try:
if self.last_measurement_time:
integration_time_h = (t - self.last_measurement_time) / 3600
power_used = (current_power * 1000000) * integration_time_h
if power_used < 0:
raise ValueError(f"Negative power used! Integration time: {integration_time_h} h Current Power: {power_used} uWh")
self.power_used_uWh += power_used
self.car_battery_capacity_uWh -= power_used
self.last_measurement_time = t
except Exception:
cloudlog.exception("Integration failed")
# Get the power usage
def get_power_used(self) -> int:
return int(self.power_used_uWh)
def get_car_battery_capacity(self) -> int:
return int(self.car_battery_capacity_uWh)
# See if we need to shutdown
def should_shutdown(self, ignition: bool, in_car: bool, offroad_timestamp: Optional[float], started_seen: bool):
if offroad_timestamp is None:
return False
if self.download_schedule:
self.update_shutdown_time()
now = time.monotonic()
should_shutdown = False
offroad_time = (now - offroad_timestamp)
low_voltage_shutdown = (self.car_voltage_mV < (VBATT_PAUSE_CHARGING * 1e3) and
self.car_voltage_instant_mV > (VBATT_INSTANT_PAUSE_CHARGING * 1e3) and
offroad_time > VOLTAGE_SHUTDOWN_MIN_OFFROAD_TIME_S)
should_shutdown |= offroad_time > self.device_shutdown_time
should_shutdown |= low_voltage_shutdown
should_shutdown |= (self.car_battery_capacity_uWh <= 0)
should_shutdown &= not ignition
should_shutdown &= (not self.params.get_bool("DisablePowerDown"))
should_shutdown &= in_car
should_shutdown &= offroad_time > DELAY_SHUTDOWN_TIME_S if self.device_shutdown_time else True # If "Instant" is selected for the timer, shutdown immediately
should_shutdown |= self.params.get_bool("ForcePowerDown")
should_shutdown &= started_seen or (now > MIN_ON_TIME_S)
return should_shutdown
def update_shutdown_time(self):
now = datetime.now()
# Adjust for daily or weekly schedule
hours = self.download_time // 2
minutes = (self.download_time % 2) * 30
next_update_time = datetime(now.year, now.month, now.day, hours, minutes)
if now >= next_update_time:
if self.download_schedule == 1: # Daily
next_update_time += timedelta(days=1)
elif self.download_schedule == 2: # Weekly
days_to_next_sunday = (6 - now.weekday()) % 7 or 7
next_update_time += timedelta(days=days_to_next_sunday)
# Add one hour buffer to give time for download
next_update_time += timedelta(hours=1)
# Update shutdown time if next update is within 24 hours
if next_update_time - now <= timedelta(hours=24):
self.device_shutdown_time = (next_update_time - now).total_seconds()