This commit is contained in:
Your Name
2024-05-12 00:33:45 -05:00
parent 65bcdde866
commit 62115b2e34
179 changed files with 19155 additions and 390 deletions

View File

@@ -32,22 +32,20 @@ for msg in lr:
print(msg.carState.steeringAngleDeg)
```
### Segment Ranges
### MultiLogIterator
We also support a new format called a "segment range", where you can specify which segments from a route to load.
`MultiLogIterator` is similar to `LogReader`, but reads multiple logs.
```python
from openpilot.tools.lib.route import Route
from openpilot.tools.lib.logreader import MultiLogIterator
lr = LogReader("a2a0ccea32023010|2023-07-27--13-01-19/4") # 4th segment
lr = LogReader("a2a0ccea32023010|2023-07-27--13-01-19/4:6") # 4th and 5th segment
lr = LogReader("a2a0ccea32023010|2023-07-27--13-01-19/-1") # last segment
lr = LogReader("a2a0ccea32023010|2023-07-27--13-01-19/:5") # first 5 segments
lr = LogReader("a2a0ccea32023010|2023-07-27--13-01-19/1:") # all except first segment
```
and can select which type of logs to grab
```python
lr = LogReader("a2a0ccea32023010|2023-07-27--13-01-19/4/q") # get qlogs
lr = LogReader("a2a0ccea32023010|2023-07-27--13-01-19/4/r") # get rlogs (default)
# setup a MultiLogIterator to read all the logs in the route
r = Route("a2a0ccea32023010|2023-07-27--13-01-19")
lr = MultiLogIterator(r.log_paths())
# print all the steering angles values from all the logs in the route
for msg in lr:
if msg.which() == "carState":
print(msg.carState.steeringAngleDeg)
```

View File

@@ -1,10 +1,11 @@
import datetime
import functools
import re
from typing import List, Optional
from openpilot.tools.lib.auth_config import get_token
from openpilot.tools.lib.api import CommaApi
from openpilot.tools.lib.helpers import RE
from openpilot.tools.lib.helpers import RE, timestamp_to_datetime
@functools.total_ordering
@@ -16,8 +17,8 @@ class Bootlog:
if not r:
raise Exception(f"Unable to parse: {url}")
self._id = r.group('log_id')
self._dongle_id = r.group('dongle_id')
self._timestamp = r.group('timestamp')
@property
def url(self) -> str:
@@ -28,21 +29,25 @@ class Bootlog:
return self._dongle_id
@property
def id(self) -> str:
return self._id
def timestamp(self) -> str:
return self._timestamp
@property
def datetime(self) -> datetime.datetime:
return timestamp_to_datetime(self._timestamp)
def __str__(self):
return f"{self._dongle_id}/{self._id}"
return f"{self._dongle_id}|{self._timestamp}"
def __eq__(self, b) -> bool:
if not isinstance(b, Bootlog):
return False
return self.id == b.id
return self.datetime == b.datetime
def __lt__(self, b) -> bool:
if not isinstance(b, Bootlog):
return False
return self.id < b.id
return self.datetime < b.datetime
def get_bootlog_from_id(bootlog_id: str) -> Optional[Bootlog]:
# TODO: implement an API endpoint for this

View File

@@ -1,36 +1,13 @@
import os
import socket
from urllib.parse import urlparse
from openpilot.tools.lib.url_file import URLFile
DATA_ENDPOINT = os.getenv("DATA_ENDPOINT", "http://data-raw.comma.internal/")
def internal_source_available():
try:
hostname = urlparse(DATA_ENDPOINT).hostname
if hostname:
socket.gethostbyname(hostname)
return True
except socket.gaierror:
pass
return False
def resolve_name(fn):
if fn.startswith("cd:/"):
return fn.replace("cd:/", DATA_ENDPOINT)
return fn
def file_exists(fn):
fn = resolve_name(fn)
if fn.startswith(("http://", "https://")):
return URLFile(fn).get_length_online() != -1
return os.path.exists(fn)
def FileReader(fn, debug=False):
fn = resolve_name(fn)
if fn.startswith(("http://", "https://")):

View File

@@ -3,20 +3,15 @@ import datetime
TIME_FMT = "%Y-%m-%d--%H-%M-%S"
# regex patterns
class RE:
DONGLE_ID = r'(?P<dongle_id>[a-f0-9]{16})'
DONGLE_ID = r'(?P<dongle_id>[a-z0-9]{16})'
TIMESTAMP = r'(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})'
LOG_ID_V2 = r'(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10})'
LOG_ID = r'(?P<log_id>(?:{}|{}))'.format(TIMESTAMP, LOG_ID_V2)
ROUTE_NAME = r'(?P<route_name>{}[|_/]{})'.format(DONGLE_ID, LOG_ID)
ROUTE_NAME = r'(?P<route_name>{}[|_/]{})'.format(DONGLE_ID, TIMESTAMP)
SEGMENT_NAME = r'{}(?:--|/)(?P<segment_num>[0-9]+)'.format(ROUTE_NAME)
INDEX = r'-?[0-9]+'
SLICE = r'(?P<start>{})?:?(?P<end>{})?:?(?P<step>{})?'.format(INDEX, INDEX, INDEX)
SEGMENT_RANGE = r'{}(?:(--|/)(?P<slice>({})))?(?:/(?P<selector>([qras])))?'.format(ROUTE_NAME, SLICE)
SEGMENT_RANGE = r'{}(?:--|/)?(?P<slice>({}))?/?(?P<selector>([qr]))?'.format(ROUTE_NAME, SLICE)
BOOTLOG_NAME = ROUTE_NAME
EXPLORER_FILE = r'^(?P<segment_name>{})--(?P<file_name>[a-z]+\.[a-z0-9]+)$'.format(SEGMENT_NAME)

View File

@@ -1,32 +1,82 @@
#!/usr/bin/env python3
import bz2
from functools import partial
import multiprocessing
import capnp
import enum
import os
import pathlib
import sys
import tqdm
import bz2
import urllib.parse
import capnp
import warnings
from typing import Callable, Dict, Iterable, Iterator, List, Optional, Type
from urllib.parse import parse_qs, urlparse
from typing import Iterable, Iterator
from cereal import log as capnp_log
from openpilot.common.swaglog import cloudlog
from openpilot.tools.lib.comma_car_segments import get_url as get_comma_segments_url
from openpilot.tools.lib.openpilotci import get_url
from openpilot.tools.lib.filereader import FileReader, file_exists, internal_source_available
from openpilot.tools.lib.route import Route, SegmentRange
from openpilot.tools.lib.filereader import FileReader
from openpilot.tools.lib.route import Route, SegmentName
LogMessage = Type[capnp._DynamicStructReader]
LogIterable = Iterable[LogMessage]
RawLogIterable = Iterable[bytes]
LogIterable = Iterable[capnp._DynamicStructReader]
# this is an iterator itself, and uses private variables from LogReader
class MultiLogIterator:
def __init__(self, log_paths, sort_by_time=False):
self._log_paths = log_paths
self.sort_by_time = sort_by_time
self._first_log_idx = next(i for i in range(len(log_paths)) if log_paths[i] is not None)
self._current_log = self._first_log_idx
self._idx = 0
self._log_readers = [None]*len(log_paths)
self.start_time = self._log_reader(self._first_log_idx)._ts[0]
def _log_reader(self, i):
if self._log_readers[i] is None and self._log_paths[i] is not None:
log_path = self._log_paths[i]
self._log_readers[i] = LogReader(log_path, sort_by_time=self.sort_by_time)
return self._log_readers[i]
def __iter__(self) -> Iterator[capnp._DynamicStructReader]:
return self
def _inc(self):
lr = self._log_reader(self._current_log)
if self._idx < len(lr._ents)-1:
self._idx += 1
else:
self._idx = 0
self._current_log = next(i for i in range(self._current_log + 1, len(self._log_readers) + 1)
if i == len(self._log_readers) or self._log_paths[i] is not None)
if self._current_log == len(self._log_readers):
raise StopIteration
def __next__(self):
while 1:
lr = self._log_reader(self._current_log)
ret = lr._ents[self._idx]
self._inc()
return ret
def tell(self):
# returns seconds from start of log
return (self._log_reader(self._current_log)._ts[self._idx] - self.start_time) * 1e-9
def seek(self, ts):
# seek to nearest minute
minute = int(ts/60)
if minute >= len(self._log_paths) or self._log_paths[minute] is None:
return False
self._current_log = minute
# HACK: O(n) seek afterward
self._idx = 0
while self.tell() < ts:
self._inc()
return True
def reset(self):
self.__init__(self._log_paths, sort_by_time=self.sort_by_time)
class _LogFileReader:
class LogReader:
def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False, dat=None):
self.data_version = None
self._only_union_types = only_union_types
@@ -56,6 +106,10 @@ class _LogFileReader:
self._ents = list(sorted(_ents, key=lambda x: x.logMonoTime) if sort_by_time else _ents)
self._ts = [x.logMonoTime for x in self._ents]
@classmethod
def from_bytes(cls, dat):
return cls("", dat=dat)
def __iter__(self) -> Iterator[capnp._DynamicStructReader]:
for ent in self._ents:
if self._only_union_types:
@@ -67,222 +121,17 @@ class _LogFileReader:
else:
yield ent
class ReadMode(enum.StrEnum):
RLOG = "r" # only read rlogs
QLOG = "q" # only read qlogs
SANITIZED = "s" # read from the commaCarSegments database
AUTO = "a" # default to rlogs, fallback to qlogs
AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user
LogPath = Optional[str]
LogPaths = List[LogPath]
ValidFileCallable = Callable[[LogPath], bool]
Source = Callable[[SegmentRange, ReadMode], LogPaths]
InternalUnavailableException = Exception("Internal source not available")
def default_valid_file(fn: LogPath) -> bool:
return fn is not None and file_exists(fn)
def auto_strategy(rlog_paths: LogPaths, qlog_paths: LogPaths, interactive: bool, valid_file: ValidFileCallable) -> LogPaths:
# auto select logs based on availability
if any(rlog is None or not valid_file(rlog) for rlog in rlog_paths):
if interactive:
if input("Some rlogs were not found, would you like to fallback to qlogs for those segments? (y/n) ").lower() != "y":
return rlog_paths
else:
cloudlog.warning("Some rlogs were not found, falling back to qlogs for those segments...")
return [rlog if valid_file(rlog) else (qlog if valid_file(qlog) else None)
for (rlog, qlog) in zip(rlog_paths, qlog_paths, strict=True)]
return rlog_paths
def apply_strategy(mode: ReadMode, rlog_paths: LogPaths, qlog_paths: LogPaths, valid_file: ValidFileCallable = default_valid_file) -> LogPaths:
if mode == ReadMode.RLOG:
return rlog_paths
elif mode == ReadMode.QLOG:
return qlog_paths
elif mode == ReadMode.AUTO:
return auto_strategy(rlog_paths, qlog_paths, False, valid_file)
elif mode == ReadMode.AUTO_INTERACTIVE:
return auto_strategy(rlog_paths, qlog_paths, True, valid_file)
raise Exception(f"invalid mode: {mode}")
def comma_api_source(sr: SegmentRange, mode: ReadMode) -> LogPaths:
route = Route(sr.route_name)
rlog_paths = [route.log_paths()[seg] for seg in sr.seg_idxs]
qlog_paths = [route.qlog_paths()[seg] for seg in sr.seg_idxs]
# comma api will have already checked if the file exists
def valid_file(fn):
return fn is not None
return apply_strategy(mode, rlog_paths, qlog_paths, valid_file=valid_file)
def internal_source(sr: SegmentRange, mode: ReadMode) -> LogPaths:
if not internal_source_available():
raise InternalUnavailableException
def get_internal_url(sr: SegmentRange, seg, file):
return f"cd:/{sr.dongle_id}/{sr.timestamp}/{seg}/{file}.bz2"
rlog_paths = [get_internal_url(sr, seg, "rlog") for seg in sr.seg_idxs]
qlog_paths = [get_internal_url(sr, seg, "qlog") for seg in sr.seg_idxs]
return apply_strategy(mode, rlog_paths, qlog_paths)
def openpilotci_source(sr: SegmentRange, mode: ReadMode) -> LogPaths:
rlog_paths = [get_url(sr.route_name, seg, "rlog") for seg in sr.seg_idxs]
qlog_paths = [get_url(sr.route_name, seg, "qlog") for seg in sr.seg_idxs]
return apply_strategy(mode, rlog_paths, qlog_paths)
def comma_car_segments_source(sr: SegmentRange, mode=ReadMode.RLOG) -> LogPaths:
return [get_comma_segments_url(sr.route_name, seg) for seg in sr.seg_idxs]
def direct_source(file_or_url: str) -> LogPaths:
return [file_or_url]
def get_invalid_files(files):
for f in files:
if f is None or not file_exists(f):
yield f
def check_source(source: Source, *args) -> LogPaths:
files = source(*args)
assert next(get_invalid_files(files), False) is False
return files
def auto_source(sr: SegmentRange, mode=ReadMode.RLOG) -> LogPaths:
if mode == ReadMode.SANITIZED:
return comma_car_segments_source(sr, mode)
SOURCES: List[Source] = [internal_source, openpilotci_source, comma_api_source, comma_car_segments_source,]
exceptions = []
# Automatically determine viable source
for source in SOURCES:
try:
return check_source(source, sr, mode)
except Exception as e:
exceptions.append(e)
raise Exception(f"auto_source could not find any valid source, exceptions for sources: {exceptions}")
def parse_useradmin(identifier: str):
if "useradmin.comma.ai" in identifier:
query = parse_qs(urlparse(identifier).query)
return query["onebox"][0]
return None
def parse_cabana(identifier: str):
if "cabana.comma.ai" in identifier:
query = parse_qs(urlparse(identifier).query)
return query["route"][0]
return None
def parse_direct(identifier: str):
if identifier.startswith(("http://", "https://", "cd:/")) or pathlib.Path(identifier).exists():
return identifier
return None
def parse_indirect(identifier: str):
parsed = parse_useradmin(identifier) or parse_cabana(identifier)
if parsed is not None:
return parsed, comma_api_source, True
return identifier, None, False
class LogReader:
def _parse_identifiers(self, identifier: str | List[str]):
if isinstance(identifier, list):
return [i for j in identifier for i in self._parse_identifiers(j)]
parsed, source, is_indirect = parse_indirect(identifier)
if not is_indirect:
direct_parsed = parse_direct(identifier)
if direct_parsed is not None:
return direct_source(identifier)
sr = SegmentRange(parsed)
mode = self.default_mode if sr.selector is None else ReadMode(sr.selector)
source = self.default_source if source is None else source
identifiers = source(sr, mode)
invalid_count = len(list(get_invalid_files(identifiers)))
assert invalid_count == 0, f"{invalid_count}/{len(identifiers)} invalid log(s) found, please ensure all logs \
are uploaded or auto fallback to qlogs with '/a' selector at the end of the route name."
return identifiers
def __init__(self, identifier: str | List[str], default_mode: ReadMode = ReadMode.RLOG,
default_source=auto_source, sort_by_time=False, only_union_types=False):
self.default_mode = default_mode
self.default_source = default_source
self.identifier = identifier
self.sort_by_time = sort_by_time
self.only_union_types = only_union_types
self.__lrs: Dict[int, _LogFileReader] = {}
self.reset()
def _get_lr(self, i):
if i not in self.__lrs:
self.__lrs[i] = _LogFileReader(self.logreader_identifiers[i])
return self.__lrs[i]
def __iter__(self):
for i in range(len(self.logreader_identifiers)):
yield from self._get_lr(i)
def _run_on_segment(self, func, i):
return func(self._get_lr(i))
def run_across_segments(self, num_processes, func):
with multiprocessing.Pool(num_processes) as pool:
ret = []
num_segs = len(self.logreader_identifiers)
for p in tqdm.tqdm(pool.imap(partial(self._run_on_segment, func), range(num_segs)), total=num_segs):
ret.extend(p)
return ret
def reset(self):
self.logreader_identifiers = self._parse_identifiers(self.identifier)
@staticmethod
def from_bytes(dat):
return _LogFileReader("", dat=dat)
def filter(self, msg_type: str):
return (getattr(m, m.which()) for m in filter(lambda m: m.which() == msg_type, self))
def first(self, msg_type: str):
return next(self.filter(msg_type), None)
def logreader_from_route_or_segment(r, sort_by_time=False):
sn = SegmentName(r, allow_route_name=True)
route = Route(sn.route_name.canonical_name)
if sn.segment_num < 0:
return MultiLogIterator(route.log_paths(), sort_by_time=sort_by_time)
else:
return LogReader(route.log_paths()[sn.segment_num], sort_by_time=sort_by_time)
if __name__ == "__main__":
import codecs
# capnproto <= 0.8.0 throws errors converting byte data to string
# below line catches those errors and replaces the bytes with \x__
codecs.register_error("strict", codecs.backslashreplace_errors)

View File

@@ -1,10 +1,9 @@
import os
import re
from functools import cache
from urllib.parse import urlparse
from collections import defaultdict
from itertools import chain
from typing import Optional, cast
from typing import Optional
from openpilot.tools.lib.auth_config import get_token
from openpilot.tools.lib.api import CommaApi
@@ -17,7 +16,6 @@ CAMERA_FILENAMES = ['fcamera.hevc', 'video.hevc']
DCAMERA_FILENAMES = ['dcamera.hevc']
ECAMERA_FILENAMES = ['ecamera.hevc']
class Route:
def __init__(self, name, data_dir=None):
self._name = RouteName(name)
@@ -38,27 +36,27 @@ class Route:
def log_paths(self):
log_path_by_seg_num = {s.name.segment_num: s.log_path for s in self._segments}
return [log_path_by_seg_num.get(i, None) for i in range(self.max_seg_number + 1)]
return [log_path_by_seg_num.get(i, None) for i in range(self.max_seg_number+1)]
def qlog_paths(self):
qlog_path_by_seg_num = {s.name.segment_num: s.qlog_path for s in self._segments}
return [qlog_path_by_seg_num.get(i, None) for i in range(self.max_seg_number + 1)]
return [qlog_path_by_seg_num.get(i, None) for i in range(self.max_seg_number+1)]
def camera_paths(self):
camera_path_by_seg_num = {s.name.segment_num: s.camera_path for s in self._segments}
return [camera_path_by_seg_num.get(i, None) for i in range(self.max_seg_number + 1)]
return [camera_path_by_seg_num.get(i, None) for i in range(self.max_seg_number+1)]
def dcamera_paths(self):
dcamera_path_by_seg_num = {s.name.segment_num: s.dcamera_path for s in self._segments}
return [dcamera_path_by_seg_num.get(i, None) for i in range(self.max_seg_number + 1)]
return [dcamera_path_by_seg_num.get(i, None) for i in range(self.max_seg_number+1)]
def ecamera_paths(self):
ecamera_path_by_seg_num = {s.name.segment_num: s.ecamera_path for s in self._segments}
return [ecamera_path_by_seg_num.get(i, None) for i in range(self.max_seg_number + 1)]
return [ecamera_path_by_seg_num.get(i, None) for i in range(self.max_seg_number+1)]
def qcamera_paths(self):
qcamera_path_by_seg_num = {s.name.segment_num: s.qcamera_path for s in self._segments}
return [qcamera_path_by_seg_num.get(i, None) for i in range(self.max_seg_number + 1)]
return [qcamera_path_by_seg_num.get(i, None) for i in range(self.max_seg_number+1)]
# TODO: refactor this, it's super repetitive
def _get_segments_remote(self):
@@ -160,7 +158,6 @@ class Route:
raise ValueError(f'Could not find segments for route {self.name.canonical_name} in data directory {data_dir}')
return sorted(segments, key=lambda seg: seg.name.segment_num)
class Segment:
def __init__(self, name, log_path, qlog_path, camera_path, dcamera_path, ecamera_path, qcamera_path):
self._name = SegmentName(name)
@@ -175,7 +172,6 @@ class Segment:
def name(self):
return self._name
class RouteName:
def __init__(self, name_str: str):
self._name_str = name_str
@@ -197,7 +193,6 @@ class RouteName:
def __str__(self) -> str: return self._canonical_name
class SegmentName:
# TODO: add constructor that takes dongle_id, time_str, segment_num and then create instances
# of this class instead of manually constructing a segment name (use canonical_name prop instead)
@@ -210,7 +205,7 @@ class SegmentName:
seg_num_delim = "--" if self._name_str.count("--") == 2 else "/"
name_parts = self._name_str.rsplit(seg_num_delim, 1)
if allow_route_name and len(name_parts) == 1:
name_parts.append("-1") # no segment number
name_parts.append("-1") # no segment number
self._route_name = RouteName(name_parts[0])
self._num = int(name_parts[1])
self._canonical_name = f"{self._route_name._dongle_id}|{self._route_name._time_str}--{self._num}"
@@ -236,62 +231,27 @@ class SegmentName:
def __str__(self) -> str: return self._canonical_name
@cache
def get_max_seg_number_cached(sr: 'SegmentRange') -> int:
try:
api = CommaApi(get_token())
return cast(int, api.get("/v1/route/" + sr.route_name.replace("/", "|"))["segment_numbers"][-1])
except Exception as e:
raise Exception("unable to get max_segment_number. ensure you have access to this route or the route is public.") from e
class SegmentRange:
def __init__(self, segment_range: str):
m = re.fullmatch(RE.SEGMENT_RANGE, segment_range)
assert m is not None, f"Segment range is not valid {segment_range}"
self.m = m
self.m = re.fullmatch(RE.SEGMENT_RANGE, segment_range)
assert self.m, f"Segment range is not valid {segment_range}"
@property
def route_name(self) -> str:
def route_name(self):
return self.m.group("route_name")
@property
def dongle_id(self) -> str:
def dongle_id(self):
return self.m.group("dongle_id")
@property
def timestamp(self) -> str:
def timestamp(self):
return self.m.group("timestamp")
@property
def slice(self) -> str:
return self.m.group("slice") or ""
def _slice(self):
return self.m.group("slice")
@property
def selector(self) -> str | None:
def selector(self):
return self.m.group("selector")
@property
def seg_idxs(self) -> list[int]:
m = re.fullmatch(RE.SLICE, self.slice)
assert m is not None, f"Invalid slice: {self.slice}"
start, end, step = (None if s is None else int(s) for s in m.groups())
# one segment specified
if start is not None and end is None and ':' not in self.slice:
if start < 0:
start += get_max_seg_number_cached(self) + 1
return [start]
s = slice(start, end, step)
# no specified end or using relative indexing, need number of segments
if end is None or end < 0 or (start is not None and start < 0):
return list(range(get_max_seg_number_cached(self) + 1))[s]
else:
return list(range(end + 1))[s]
def __str__(self) -> str:
return f"{self.dongle_id}/{self.timestamp}" + (f"/{self.slice}" if self.slice else "") + (f"/{self.selector}" if self.selector else "")
def __repr__(self) -> str:
return self.__str__()

141
tools/lib/srreader.py Executable file
View File

@@ -0,0 +1,141 @@
import enum
import numpy as np
import pathlib
import re
from urllib.parse import parse_qs, urlparse
from openpilot.selfdrive.test.openpilotci import get_url
from openpilot.tools.lib.helpers import RE
from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.route import Route, SegmentRange
class ReadMode(enum.StrEnum):
RLOG = "r" # only read rlogs
QLOG = "q" # only read qlogs
#AUTO = "a" # default to rlogs, fallback to qlogs, not supported yet
def create_slice_from_string(s: str):
m = re.fullmatch(RE.SLICE, s)
assert m is not None, f"Invalid slice: {s}"
start, end, step = m.groups()
start = int(start) if start is not None else None
end = int(end) if end is not None else None
step = int(step) if step is not None else None
if start is not None and ":" not in s and end is None and step is None:
return start
return slice(start, end, step)
def parse_slice(sr: SegmentRange):
route = Route(sr.route_name)
segs = np.arange(route.max_seg_number+1)
s = create_slice_from_string(sr._slice)
return segs[s] if isinstance(s, slice) else [segs[s]]
def comma_api_source(sr: SegmentRange, mode=ReadMode.RLOG, sort_by_time=False):
segs = parse_slice(sr)
route = Route(sr.route_name)
log_paths = route.log_paths() if mode == ReadMode.RLOG else route.qlog_paths()
invalid_segs = [seg for seg in segs if log_paths[seg] is None]
assert not len(invalid_segs), f"Some of the requested segments are not available: {invalid_segs}"
for seg in segs:
yield LogReader(log_paths[seg], sort_by_time=sort_by_time)
def internal_source(sr: SegmentRange, mode=ReadMode.RLOG, sort_by_time=False):
segs = parse_slice(sr)
for seg in segs:
yield LogReader(f"cd:/{sr.dongle_id}/{sr.timestamp}/{seg}/{'rlog' if mode == ReadMode.RLOG else 'qlog'}.bz2", sort_by_time=sort_by_time)
def openpilotci_source(sr: SegmentRange, mode=ReadMode.RLOG, sort_by_time=False):
segs = parse_slice(sr)
for seg in segs:
yield LogReader(get_url(sr.route_name, seg, 'rlog' if mode == ReadMode.RLOG else 'qlog'), sort_by_time=sort_by_time)
def direct_source(file_or_url, sort_by_time):
yield LogReader(file_or_url, sort_by_time=sort_by_time)
def auto_source(*args, **kwargs):
# Automatically determine viable source
try:
next(internal_source(*args, **kwargs))
return internal_source(*args, **kwargs)
except Exception:
pass
try:
next(openpilotci_source(*args, **kwargs))
return openpilotci_source(*args, **kwargs)
except Exception:
pass
return comma_api_source(*args, **kwargs)
def parse_useradmin(identifier):
if "useradmin.comma.ai" in identifier:
query = parse_qs(urlparse(identifier).query)
return query["onebox"][0]
return None
def parse_cabana(identifier):
if "cabana.comma.ai" in identifier:
query = parse_qs(urlparse(identifier).query)
return query["route"][0]
return None
def parse_cd(identifier):
if "cd:/" in identifier:
return identifier.replace("cd:/", "")
return None
def parse_direct(identifier):
if "https://" in identifier or "http://" in identifier or pathlib.Path(identifier).exists():
return identifier
return None
def parse_indirect(identifier):
parsed = parse_useradmin(identifier) or parse_cabana(identifier)
if parsed is not None:
return parsed, comma_api_source, True
parsed = parse_cd(identifier)
if parsed is not None:
return parsed, internal_source, True
return identifier, None, False
class SegmentRangeReader:
def _logreaders_from_identifier(self, identifier):
parsed, source, is_indirect = parse_indirect(identifier)
if not is_indirect:
direct_parsed = parse_direct(identifier)
if direct_parsed is not None:
return direct_source(identifier, sort_by_time=self.sort_by_time)
sr = SegmentRange(parsed)
mode = self.default_mode if sr.selector is None else ReadMode(sr.selector)
source = self.default_source if source is None else source
return source(sr, mode, sort_by_time=self.sort_by_time)
def __init__(self, identifier: str, default_mode=ReadMode.RLOG, default_source=auto_source, sort_by_time=False):
self.default_mode = default_mode
self.default_source = default_source
self.sort_by_time = sort_by_time
self.lrs = self._logreaders_from_identifier(identifier)
def __iter__(self):
for lr in self.lrs:
for m in lr:
yield m

View File

@@ -1,11 +1,12 @@
#!/usr/bin/env python3
from functools import partial
from functools import wraps
import http.server
import os
import threading
import time
import unittest
from parameterized import parameterized
from openpilot.selfdrive.athena.tests.helpers import with_http_server
from openpilot.tools.lib.url_file import URLFile
@@ -29,7 +30,27 @@ class CachingTestRequestHandler(http.server.BaseHTTPRequestHandler):
self.end_headers()
with_caching_server = partial(with_http_server, handler=CachingTestRequestHandler)
class CachingTestServer(threading.Thread):
def run(self):
self.server = http.server.HTTPServer(("127.0.0.1", 0), CachingTestRequestHandler)
self.port = self.server.server_port
self.server.serve_forever()
def stop(self):
self.server.server_close()
self.server.shutdown()
def with_caching_server(func):
@wraps(func)
def wrapper(*args, **kwargs):
server = CachingTestServer()
server.start()
time.sleep(0.25) # wait for server to get it's port
try:
func(*args, **kwargs, port=server.port)
finally:
server.stop()
return wrapper
class TestFileDownload(unittest.TestCase):
@@ -89,10 +110,10 @@ class TestFileDownload(unittest.TestCase):
@parameterized.expand([(True, ), (False, )])
@with_caching_server
def test_recover_from_missing_file(self, cache_enabled, host):
def test_recover_from_missing_file(self, cache_enabled, port):
os.environ["FILEREADER_CACHE"] = "1" if cache_enabled else "0"
file_url = f"{host}/test.png"
file_url = f"http://localhost:{port}/test.png"
CachingTestRequestHandler.FILE_EXISTS = False
length = URLFile(file_url).get_length()

View File

@@ -0,0 +1,88 @@
import shutil
import tempfile
import numpy as np
import unittest
from parameterized import parameterized
import requests
from openpilot.tools.lib.route import SegmentRange
from openpilot.tools.lib.srreader import ReadMode, SegmentRangeReader, parse_slice, parse_indirect
NUM_SEGS = 17 # number of segments in the test route
ALL_SEGS = list(np.arange(NUM_SEGS))
TEST_ROUTE = "344c5c15b34f2d8a/2024-01-03--09-37-12"
QLOG_FILE = "https://commadataci.blob.core.windows.net/openpilotci/0375fdf7b1ce594d/2019-06-13--08-32-25/3/qlog.bz2"
class TestSegmentRangeReader(unittest.TestCase):
@parameterized.expand([
(f"{TEST_ROUTE}", ALL_SEGS),
(f"{TEST_ROUTE.replace('/', '|')}", ALL_SEGS),
(f"{TEST_ROUTE}--0", [0]),
(f"{TEST_ROUTE}--5", [5]),
(f"{TEST_ROUTE}/0", [0]),
(f"{TEST_ROUTE}/5", [5]),
(f"{TEST_ROUTE}/0:10", ALL_SEGS[0:10]),
(f"{TEST_ROUTE}/0:0", []),
(f"{TEST_ROUTE}/4:6", ALL_SEGS[4:6]),
(f"{TEST_ROUTE}/0:-1", ALL_SEGS[0:-1]),
(f"{TEST_ROUTE}/:5", ALL_SEGS[:5]),
(f"{TEST_ROUTE}/2:", ALL_SEGS[2:]),
(f"{TEST_ROUTE}/2:-1", ALL_SEGS[2:-1]),
(f"{TEST_ROUTE}/-1", [ALL_SEGS[-1]]),
(f"{TEST_ROUTE}/-2", [ALL_SEGS[-2]]),
(f"{TEST_ROUTE}/-2:-1", ALL_SEGS[-2:-1]),
(f"{TEST_ROUTE}/-4:-2", ALL_SEGS[-4:-2]),
(f"{TEST_ROUTE}/:10:2", ALL_SEGS[:10:2]),
(f"{TEST_ROUTE}/5::2", ALL_SEGS[5::2]),
(f"https://useradmin.comma.ai/?onebox={TEST_ROUTE}", ALL_SEGS),
(f"https://useradmin.comma.ai/?onebox={TEST_ROUTE.replace('/', '|')}", ALL_SEGS),
(f"https://useradmin.comma.ai/?onebox={TEST_ROUTE.replace('/', '%7C')}", ALL_SEGS),
(f"https://cabana.comma.ai/?route={TEST_ROUTE}", ALL_SEGS),
(f"cd:/{TEST_ROUTE}", ALL_SEGS),
])
def test_indirect_parsing(self, identifier, expected):
parsed, _, _ = parse_indirect(identifier)
sr = SegmentRange(parsed)
segs = parse_slice(sr)
self.assertListEqual(list(segs), expected)
def test_direct_parsing(self):
qlog = tempfile.NamedTemporaryFile(mode='wb', delete=False)
with requests.get(QLOG_FILE, stream=True) as r:
with qlog as f:
shutil.copyfileobj(r.raw, f)
for f in [QLOG_FILE, qlog.name]:
l = len(list(SegmentRangeReader(f)))
self.assertGreater(l, 100)
@parameterized.expand([
(f"{TEST_ROUTE}///",),
(f"{TEST_ROUTE}---",),
(f"{TEST_ROUTE}/-4:--2",),
(f"{TEST_ROUTE}/-a",),
(f"{TEST_ROUTE}/j",),
(f"{TEST_ROUTE}/0:1:2:3",),
(f"{TEST_ROUTE}/:::3",),
])
def test_bad_ranges(self, segment_range):
with self.assertRaises(AssertionError):
sr = SegmentRange(segment_range)
parse_slice(sr)
def test_modes(self):
qlog_len = len(list(SegmentRangeReader(f"{TEST_ROUTE}/0", ReadMode.QLOG)))
rlog_len = len(list(SegmentRangeReader(f"{TEST_ROUTE}/0", ReadMode.RLOG)))
self.assertLess(qlog_len * 6, rlog_len)
def test_modes_from_name(self):
qlog_len = len(list(SegmentRangeReader(f"{TEST_ROUTE}/0/q")))
rlog_len = len(list(SegmentRangeReader(f"{TEST_ROUTE}/0/r")))
self.assertLess(qlog_len * 6, rlog_len)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,11 +1,10 @@
import logging
import os
import socket
import time
import threading
from hashlib import sha256
from urllib3 import PoolManager, Retry
from urllib3.response import BaseHTTPResponse
from urllib3 import PoolManager
from urllib3.util import Timeout
from tenacity import retry, wait_random_exponential, stop_after_attempt
from openpilot.common.file_helpers import atomic_write_in_dir
from openpilot.system.hardware.hw import Paths
@@ -13,9 +12,8 @@ from openpilot.system.hardware.hw import Paths
K = 1000
CHUNK_SIZE = 1000 * K
logging.getLogger("urllib3").setLevel(logging.WARNING)
def hash_256(link: str) -> str:
def hash_256(link):
hsh = str(sha256((link.split("?")[0]).encode('utf-8')).hexdigest())
return hsh
@@ -25,25 +23,13 @@ class URLFileException(Exception):
class URLFile:
_pool_manager: PoolManager|None = None
_tlocal = threading.local()
@staticmethod
def reset() -> None:
URLFile._pool_manager = None
@staticmethod
def pool_manager() -> PoolManager:
if URLFile._pool_manager is None:
socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),]
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[409, 429, 503, 504])
URLFile._pool_manager = PoolManager(num_pools=10, maxsize=100, socket_options=socket_options, retries=retries)
return URLFile._pool_manager
def __init__(self, url: str, timeout: int=10, debug: bool=False, cache: bool|None=None):
def __init__(self, url, debug=False, cache=None):
self._url = url
self._timeout = Timeout(connect=timeout, read=timeout)
self._pos = 0
self._length: int|None = None
self._length = None
self._local_file = None
self._debug = debug
# True by default, false if FILEREADER_CACHE is defined, but can be overwritten by the cache input
self._force_download = not int(os.environ.get("FILEREADER_CACHE", "0"))
@@ -53,23 +39,30 @@ class URLFile:
if not self._force_download:
os.makedirs(Paths.download_cache_root(), exist_ok=True)
try:
self._http_client = URLFile._tlocal.http_client
except AttributeError:
self._http_client = URLFile._tlocal.http_client = PoolManager()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
pass
def __exit__(self, exc_type, exc_value, traceback):
if self._local_file is not None:
os.remove(self._local_file.name)
self._local_file.close()
self._local_file = None
def _request(self, method: str, url: str, headers: dict[str, str]|None=None) -> BaseHTTPResponse:
return URLFile.pool_manager().request(method, url, timeout=self._timeout, headers=headers)
def get_length_online(self) -> int:
response = self._request('HEAD', self._url)
@retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True)
def get_length_online(self):
timeout = Timeout(connect=50.0, read=500.0)
response = self._http_client.request('HEAD', self._url, timeout=timeout, preload_content=False)
if not (200 <= response.status <= 299):
return -1
length = response.headers.get('content-length', 0)
return int(length)
def get_length(self) -> int:
def get_length(self):
if self._length is not None:
return self._length
@@ -86,7 +79,7 @@ class URLFile:
file_length.write(str(self._length))
return self._length
def read(self, ll: int|None=None) -> bytes:
def read(self, ll=None):
if self._force_download:
return self.read_aux(ll=ll)
@@ -118,9 +111,10 @@ class URLFile:
self._pos = file_end
return response
def read_aux(self, ll: int|None=None) -> bytes:
@retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True)
def read_aux(self, ll=None):
download_range = False
headers = {}
headers = {'Connection': 'keep-alive'}
if self._pos != 0 or ll is not None:
if ll is None:
end = self.get_length() - 1
@@ -134,7 +128,8 @@ class URLFile:
if self._debug:
t1 = time.time()
response = self._request('GET', self._url, headers=headers)
timeout = Timeout(connect=50.0, read=500.0)
response = self._http_client.request('GET', self._url, timeout=timeout, preload_content=False, headers=headers)
ret = response.data
if self._debug:
@@ -153,12 +148,9 @@ class URLFile:
self._pos += len(ret)
return ret
def seek(self, pos:int) -> None:
def seek(self, pos):
self._pos = pos
@property
def name(self) -> str:
def name(self):
return self._url
os.register_at_fork(after_in_child=URLFile.reset)