166 lines
7.1 KiB
Python
166 lines
7.1 KiB
Python
import time
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from openpilot.common.params import Params
|
|
from openpilot.system.hardware import HARDWARE
|
|
from openpilot.common.swaglog import cloudlog
|
|
from openpilot.selfdrive.statsd import statlog
|
|
|
|
CAR_VOLTAGE_LOW_PASS_K = 0.011 # LPF gain for 45s tau (dt/tau / (dt/tau + 1))
|
|
|
|
# While driving, a battery charges completely in about 30-60 minutes
|
|
CAR_BATTERY_CAPACITY_uWh = 30e6
|
|
CAR_CHARGING_RATE_W = 45
|
|
|
|
VBATT_PAUSE_CHARGING = 11.8 # Lower limit on the LPF car battery voltage
|
|
VBATT_INSTANT_PAUSE_CHARGING = 7.0 # Lower limit on the instant car battery voltage measurements to avoid triggering on instant power loss
|
|
MAX_TIME_OFFROAD_S = 30*3600
|
|
MIN_ON_TIME_S = 3600
|
|
DELAY_SHUTDOWN_TIME_S = 300 # Wait at least DELAY_SHUTDOWN_TIME_S seconds after offroad_time to shutdown.
|
|
VOLTAGE_SHUTDOWN_MIN_OFFROAD_TIME_S = 60
|
|
|
|
class PowerMonitoring:
|
|
def __init__(self):
|
|
self.params = Params()
|
|
self.last_measurement_time = None # Used for integration delta
|
|
self.last_save_time = 0 # Used for saving current value in a param
|
|
self.power_used_uWh = 0 # Integrated power usage in uWh since going into offroad
|
|
self.next_pulsed_measurement_time = None
|
|
self.car_voltage_mV = 12e3 # Low-passed version of peripheralState voltage
|
|
self.car_voltage_instant_mV = 12e3 # Last value of peripheralState voltage
|
|
self.integration_lock = threading.Lock()
|
|
|
|
car_battery_capacity_uWh = self.params.get("CarBatteryCapacity")
|
|
if car_battery_capacity_uWh is None:
|
|
car_battery_capacity_uWh = 0
|
|
|
|
# Reset capacity if it's low
|
|
self.car_battery_capacity_uWh = max((CAR_BATTERY_CAPACITY_uWh / 10), int(car_battery_capacity_uWh))
|
|
|
|
# FrogPilot variables
|
|
device_shutdown_setting = self.params.get_int("DeviceShutdown")
|
|
# If the toggle is set for < 1 hour, configure by 15 minute increments
|
|
self.device_shutdown_time = (device_shutdown_setting - 3) * 3600 if device_shutdown_setting >= 4 else device_shutdown_setting * (60 * 15)
|
|
self.download_schedule = self.params.get_int("UpdateSchedule")
|
|
self.download_time = self.params.get_int("UpdateTime")
|
|
|
|
# Calculation tick
|
|
def calculate(self, voltage: Optional[int], ignition: bool):
|
|
try:
|
|
now = time.monotonic()
|
|
|
|
# If peripheralState is None, we're probably not in a car, so we don't care
|
|
if voltage is None:
|
|
with self.integration_lock:
|
|
self.last_measurement_time = None
|
|
self.next_pulsed_measurement_time = None
|
|
self.power_used_uWh = 0
|
|
return
|
|
|
|
# Low-pass battery voltage
|
|
self.car_voltage_instant_mV = voltage
|
|
self.car_voltage_mV = ((voltage * CAR_VOLTAGE_LOW_PASS_K) + (self.car_voltage_mV * (1 - CAR_VOLTAGE_LOW_PASS_K)))
|
|
statlog.gauge("car_voltage", self.car_voltage_mV / 1e3)
|
|
|
|
# Cap the car battery power and save it in a param every 10-ish seconds
|
|
self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0)
|
|
self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh)
|
|
if now - self.last_save_time >= 10:
|
|
self.params.put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh)))
|
|
self.last_save_time = now
|
|
|
|
# First measurement, set integration time
|
|
with self.integration_lock:
|
|
if self.last_measurement_time is None:
|
|
self.last_measurement_time = now
|
|
return
|
|
|
|
if ignition:
|
|
# If there is ignition, we integrate the charging rate of the car
|
|
with self.integration_lock:
|
|
self.power_used_uWh = 0
|
|
integration_time_h = (now - self.last_measurement_time) / 3600
|
|
if integration_time_h < 0:
|
|
raise ValueError(f"Negative integration time: {integration_time_h}h")
|
|
self.car_battery_capacity_uWh += (CAR_CHARGING_RATE_W * 1e6 * integration_time_h)
|
|
self.last_measurement_time = now
|
|
else:
|
|
# Get current power draw somehow
|
|
current_power = HARDWARE.get_current_power_draw()
|
|
|
|
# Do the integration
|
|
self._perform_integration(now, current_power)
|
|
except Exception:
|
|
cloudlog.exception("Power monitoring calculation failed")
|
|
|
|
def _perform_integration(self, t: float, current_power: float) -> None:
|
|
with self.integration_lock:
|
|
try:
|
|
if self.last_measurement_time:
|
|
integration_time_h = (t - self.last_measurement_time) / 3600
|
|
power_used = (current_power * 1000000) * integration_time_h
|
|
if power_used < 0:
|
|
raise ValueError(f"Negative power used! Integration time: {integration_time_h} h Current Power: {power_used} uWh")
|
|
self.power_used_uWh += power_used
|
|
self.car_battery_capacity_uWh -= power_used
|
|
self.last_measurement_time = t
|
|
except Exception:
|
|
cloudlog.exception("Integration failed")
|
|
|
|
# Get the power usage
|
|
def get_power_used(self) -> int:
|
|
return int(self.power_used_uWh)
|
|
|
|
def get_car_battery_capacity(self) -> int:
|
|
return int(self.car_battery_capacity_uWh)
|
|
|
|
# See if we need to shutdown
|
|
def should_shutdown(self, ignition: bool, in_car: bool, offroad_timestamp: Optional[float], started_seen: bool):
|
|
if offroad_timestamp is None:
|
|
return False
|
|
|
|
if self.download_schedule:
|
|
self.update_shutdown_time()
|
|
|
|
now = time.monotonic()
|
|
should_shutdown = False
|
|
offroad_time = (now - offroad_timestamp)
|
|
low_voltage_shutdown = (self.car_voltage_mV < (VBATT_PAUSE_CHARGING * 1e3) and
|
|
self.car_voltage_instant_mV > (VBATT_INSTANT_PAUSE_CHARGING * 1e3) and
|
|
offroad_time > VOLTAGE_SHUTDOWN_MIN_OFFROAD_TIME_S)
|
|
should_shutdown |= offroad_time > self.device_shutdown_time
|
|
should_shutdown |= low_voltage_shutdown
|
|
should_shutdown |= (self.car_battery_capacity_uWh <= 0)
|
|
should_shutdown &= not ignition
|
|
should_shutdown &= (not self.params.get_bool("DisablePowerDown"))
|
|
should_shutdown &= in_car
|
|
should_shutdown &= offroad_time > DELAY_SHUTDOWN_TIME_S if self.device_shutdown_time else True # If "Instant" is selected for the timer, shutdown immediately
|
|
should_shutdown |= self.params.get_bool("ForcePowerDown")
|
|
should_shutdown &= started_seen or (now > MIN_ON_TIME_S)
|
|
return should_shutdown
|
|
|
|
def update_shutdown_time(self):
|
|
now = datetime.now()
|
|
|
|
# Adjust for daily or weekly schedule
|
|
hours = self.download_time // 2
|
|
minutes = (self.download_time % 2) * 30
|
|
|
|
next_update_time = datetime(now.year, now.month, now.day, hours, minutes)
|
|
|
|
if now >= next_update_time:
|
|
if self.download_schedule == 1: # Daily
|
|
next_update_time += timedelta(days=1)
|
|
elif self.download_schedule == 2: # Weekly
|
|
days_to_next_sunday = (6 - now.weekday()) % 7 or 7
|
|
next_update_time += timedelta(days=days_to_next_sunday)
|
|
|
|
# Add one hour buffer to give time for download
|
|
next_update_time += timedelta(hours=1)
|
|
|
|
# Update shutdown time if next update is within 24 hours
|
|
if next_update_time - now <= timedelta(hours=24):
|
|
self.device_shutdown_time = (next_update_time - now).total_seconds()
|