diff --git a/system/loggerd/README.md b/system/loggerd/README.md new file mode 100644 index 0000000..714e424 --- /dev/null +++ b/system/loggerd/README.md @@ -0,0 +1,30 @@ +# loggerd + +openpilot records routes in one minute chunks called segments. A route starts on the rising edge of ignition and ends on the falling edge. + +Check out our [python library](https://github.com/commaai/openpilot/blob/master/tools/lib/logreader.py) for reading openpilot logs. Also checkout our [tools](https://github.com/commaai/openpilot/tree/master/tools) to replay and view your data. These are the same tools we use to debug and develop openpilot. + +## log types + +For each segment, openpilot records the following log types: + +## rlog.bz2 + +rlogs contain all the messages passed amongst openpilot's processes. See [cereal/services.py](https://github.com/commaai/cereal/blob/master/services.py) for a list of all the logged services. They're a bzip2 archive of the serialized capnproto messages. + +## {f,e,d}camera.hevc + +Each camera stream is H.265 encoded and written to its respective file. +* fcamera.hevc is the road camera +* ecamera.hevc is the wide road camera +* dcamera.hevc is the driver camera + +## qlog.bz2 & qcamera.ts + +qlogs are a decimated subset of the rlogs. Check out [cereal/services.py](https://github.com/commaai/cereal/blob/master/services.py) for the decimation. + + +qcameras are H.264 encoded, lower res versions of the fcamera.hevc. The video shown in [comma connect](https://connect.comma.ai/) is from the qcameras. + + +qlogs and qcameras are designed to be small enough to upload instantly on slow internet and store forever, yet useful enough for most analysis and debugging. diff --git a/system/loggerd/SConscript b/system/loggerd/SConscript new file mode 100644 index 0000000..d4f52fb --- /dev/null +++ b/system/loggerd/SConscript @@ -0,0 +1,27 @@ +Import('env', 'arch', 'cereal', 'messaging', 'common', 'visionipc') + +libs = [common, cereal, messaging, visionipc, + 'zmq', 'capnp', 'kj', 'z', + 'avformat', 'avcodec', 'swscale', 'avutil', + 'yuv', 'OpenCL', 'pthread'] + +src = ['logger.cc', 'video_writer.cc', 'encoder/encoder.cc', 'encoder/v4l_encoder.cc'] +if arch != "larch64": + src += ['encoder/ffmpeg_encoder.cc'] + +if arch == "Darwin": + # fix OpenCL + del libs[libs.index('OpenCL')] + env['FRAMEWORKS'] = ['OpenCL'] + # exclude v4l + del src[src.index('encoder/v4l_encoder.cc')] + +logger_lib = env.Library('logger', src) +libs.insert(0, logger_lib) + +env.Program('loggerd', ['loggerd.cc'], LIBS=libs) +env.Program('encoderd', ['encoderd.cc'], LIBS=libs) +env.Program('bootlog.cc', LIBS=libs) + +if GetOption('extras'): + env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=libs + ['curl', 'crypto']) diff --git a/system/loggerd/bootlog b/system/loggerd/bootlog deleted file mode 100755 index 4878a24..0000000 Binary files a/system/loggerd/bootlog and /dev/null differ diff --git a/system/loggerd/bootlog.cc b/system/loggerd/bootlog.cc new file mode 100644 index 0000000..b8257b6 --- /dev/null +++ b/system/loggerd/bootlog.cc @@ -0,0 +1,70 @@ +#include +#include + +#include "cereal/messaging/messaging.h" +#include "common/params.h" +#include "common/swaglog.h" +#include "system/loggerd/logger.h" + + +static kj::Array build_boot_log() { + MessageBuilder msg; + auto boot = msg.initEvent().initBoot(); + + boot.setWallTimeNanos(nanos_since_epoch()); + + std::string pstore = "/sys/fs/pstore"; + std::map pstore_map = util::read_files_in_dir(pstore); + + int i = 0; + auto lpstore = boot.initPstore().initEntries(pstore_map.size()); + for (auto& kv : pstore_map) { + auto lentry = lpstore[i]; + lentry.setKey(kv.first); + lentry.setValue(capnp::Data::Reader((const kj::byte*)kv.second.data(), kv.second.size())); + i++; + } + + // Gather output of commands + std::vector bootlog_commands = { + "[ -x \"$(command -v journalctl)\" ] && journalctl", + }; + + if (Hardware::TICI()) { + bootlog_commands.push_back("[ -e /dev/nvme0 ] && sudo nvme smart-log --output-format=json /dev/nvme0"); + } + + auto commands = boot.initCommands().initEntries(bootlog_commands.size()); + for (int j = 0; j < bootlog_commands.size(); j++) { + auto lentry = commands[j]; + + lentry.setKey(bootlog_commands[j]); + + const std::string result = util::check_output(bootlog_commands[j]); + lentry.setValue(capnp::Data::Reader((const kj::byte*)result.data(), result.size())); + } + + boot.setLaunchLog(util::read_file("/tmp/launch_log")); + return capnp::messageToFlatArray(msg); +} + +int main(int argc, char** argv) { + const std::string id = logger_get_identifier("BootCount"); + const std::string path = Path::log_root() + "/boot/" + id; + LOGW("bootlog to %s", path.c_str()); + + // Open bootlog + bool r = util::create_directories(Path::log_root() + "/boot/", 0775); + assert(r); + + RawFile file(path.c_str()); + // Write initdata + file.write(logger_build_init_data().asBytes()); + // Write bootlog + file.write(build_boot_log().asBytes()); + + // Write out bootlog param to match routes with bootlog + Params().put("CurrentBootlog", id.c_str()); + + return 0; +} diff --git a/system/loggerd/config.py b/system/loggerd/config.py index 2941e39..03abe21 100644 --- a/system/loggerd/config.py +++ b/system/loggerd/config.py @@ -32,7 +32,7 @@ def get_used_bytes(default=None): try: statvfs = os.statvfs(Paths.log_root()) total_bytes = statvfs.f_blocks * statvfs.f_frsize - available_bytes = statvfs.f_bavail * statvfs.f_frsize + available_bytes = get_available_bytes(default) used_bytes = total_bytes - available_bytes except OSError: used_bytes = default diff --git a/system/loggerd/deleter.py b/system/loggerd/deleter.py index 8683401..2f0b96c 100755 --- a/system/loggerd/deleter.py +++ b/system/loggerd/deleter.py @@ -2,7 +2,6 @@ import os import shutil import threading -from typing import List from openpilot.system.hardware.hw import Paths from openpilot.common.swaglog import cloudlog from openpilot.system.loggerd.config import get_available_bytes, get_available_percent @@ -23,7 +22,7 @@ def has_preserve_xattr(d: str) -> bool: return getxattr(os.path.join(Paths.log_root(), d), PRESERVE_ATTR_NAME) == PRESERVE_ATTR_VALUE -def get_preserved_segments(dirs_by_creation: List[str]) -> List[str]: +def get_preserved_segments(dirs_by_creation: list[str]) -> list[str]: preserved = [] for n, d in enumerate(filter(has_preserve_xattr, reversed(dirs_by_creation))): if n == PRESERVE_COUNT: diff --git a/system/loggerd/encoder/encoder.cc b/system/loggerd/encoder/encoder.cc new file mode 100644 index 0000000..c4bd91b --- /dev/null +++ b/system/loggerd/encoder/encoder.cc @@ -0,0 +1,43 @@ +#include "system/loggerd/encoder/encoder.h" + +VideoEncoder::VideoEncoder(const EncoderInfo &encoder_info, int in_width, int in_height) + : encoder_info(encoder_info), in_width(in_width), in_height(in_height) { + + out_width = encoder_info.frame_width > 0 ? encoder_info.frame_width : in_width; + out_height = encoder_info.frame_height > 0 ? encoder_info.frame_height : in_height; + + pm.reset(new PubMaster({encoder_info.publish_name})); +} + +void VideoEncoder::publisher_publish(VideoEncoder *e, int segment_num, uint32_t idx, VisionIpcBufExtra &extra, + unsigned int flags, kj::ArrayPtr header, kj::ArrayPtr dat) { + // broadcast packet + MessageBuilder msg; + auto event = msg.initEvent(true); + auto edat = (event.*(e->encoder_info.init_encode_data_func))(); + auto edata = edat.initIdx(); + struct timespec ts; + timespec_get(&ts, TIME_UTC); + edat.setUnixTimestampNanos((uint64_t)ts.tv_sec*1000000000 + ts.tv_nsec); + edata.setFrameId(extra.frame_id); + edata.setTimestampSof(extra.timestamp_sof); + edata.setTimestampEof(extra.timestamp_eof); + edata.setType(e->encoder_info.encode_type); + edata.setEncodeId(e->cnt++); + edata.setSegmentNum(segment_num); + edata.setSegmentId(idx); + edata.setFlags(flags); + edata.setLen(dat.size()); + edat.setData(dat); + edat.setWidth(out_width); + edat.setHeight(out_height); + if (flags & V4L2_BUF_FLAG_KEYFRAME) edat.setHeader(header); + + uint32_t bytes_size = capnp::computeSerializedSizeInWords(msg) * sizeof(capnp::word); + if (e->msg_cache.size() < bytes_size) { + e->msg_cache.resize(bytes_size); + } + kj::ArrayOutputStream output_stream(kj::ArrayPtr(e->msg_cache.data(), bytes_size)); + capnp::writeMessage(output_stream, msg); + e->pm->send(e->encoder_info.publish_name, e->msg_cache.data(), bytes_size); +} diff --git a/system/loggerd/encoder/encoder.h b/system/loggerd/encoder/encoder.h new file mode 100644 index 0000000..7c203f9 --- /dev/null +++ b/system/loggerd/encoder/encoder.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "cereal/messaging/messaging.h" +#include "cereal/visionipc/visionipc.h" +#include "common/queue.h" +#include "system/camerad/cameras/camera_common.h" +#include "system/loggerd/loggerd.h" + +#define V4L2_BUF_FLAG_KEYFRAME 8 + +class VideoEncoder { +public: + VideoEncoder(const EncoderInfo &encoder_info, int in_width, int in_height); + virtual ~VideoEncoder() {} + virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0; + virtual void encoder_open(const char* path) = 0; + virtual void encoder_close() = 0; + + void publisher_publish(VideoEncoder *e, int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr header, kj::ArrayPtr dat); + +protected: + int in_width, in_height; + int out_width, out_height; + const EncoderInfo encoder_info; + +private: + // total frames encoded + int cnt = 0; + std::unique_ptr pm; + std::vector msg_cache; +}; diff --git a/system/loggerd/encoder/ffmpeg_encoder.cc b/system/loggerd/encoder/ffmpeg_encoder.cc new file mode 100644 index 0000000..9d992f0 --- /dev/null +++ b/system/loggerd/encoder/ffmpeg_encoder.cc @@ -0,0 +1,150 @@ +#pragma clang diagnostic ignored "-Wdeprecated-declarations" + +#include "system/loggerd/encoder/ffmpeg_encoder.h" + +#include +#include + +#include +#include +#include + +#define __STDC_CONSTANT_MACROS + +#include "third_party/libyuv/include/libyuv.h" + +extern "C" { +#include +#include +#include +} + +#include "common/swaglog.h" +#include "common/util.h" + +const int env_debug_encoder = (getenv("DEBUG_ENCODER") != NULL) ? atoi(getenv("DEBUG_ENCODER")) : 0; + +FfmpegEncoder::FfmpegEncoder(const EncoderInfo &encoder_info, int in_width, int in_height) + : VideoEncoder(encoder_info, in_width, in_height) { + frame = av_frame_alloc(); + assert(frame); + frame->format = AV_PIX_FMT_YUV420P; + frame->width = out_width; + frame->height = out_height; + frame->linesize[0] = out_width; + frame->linesize[1] = out_width/2; + frame->linesize[2] = out_width/2; + + convert_buf.resize(in_width * in_height * 3 / 2); + + if (in_width != out_width || in_height != out_height) { + downscale_buf.resize(out_width * out_height * 3 / 2); + } +} + +FfmpegEncoder::~FfmpegEncoder() { + encoder_close(); + av_frame_free(&frame); +} + +void FfmpegEncoder::encoder_open(const char* path) { + const AVCodec *codec = avcodec_find_encoder(AV_CODEC_ID_FFVHUFF); + + this->codec_ctx = avcodec_alloc_context3(codec); + assert(this->codec_ctx); + this->codec_ctx->width = frame->width; + this->codec_ctx->height = frame->height; + this->codec_ctx->pix_fmt = AV_PIX_FMT_YUV420P; + this->codec_ctx->time_base = (AVRational){ 1, encoder_info.fps }; + int err = avcodec_open2(this->codec_ctx, codec, NULL); + assert(err >= 0); + + is_open = true; + segment_num++; + counter = 0; +} + +void FfmpegEncoder::encoder_close() { + if (!is_open) return; + + avcodec_free_context(&codec_ctx); + is_open = false; +} + +int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) { + assert(buf->width == this->in_width); + assert(buf->height == this->in_height); + + uint8_t *cy = convert_buf.data(); + uint8_t *cu = cy + in_width * in_height; + uint8_t *cv = cu + (in_width / 2) * (in_height / 2); + libyuv::NV12ToI420(buf->y, buf->stride, + buf->uv, buf->stride, + cy, in_width, + cu, in_width/2, + cv, in_width/2, + in_width, in_height); + + if (downscale_buf.size() > 0) { + uint8_t *out_y = downscale_buf.data(); + uint8_t *out_u = out_y + frame->width * frame->height; + uint8_t *out_v = out_u + (frame->width / 2) * (frame->height / 2); + libyuv::I420Scale(cy, in_width, + cu, in_width/2, + cv, in_width/2, + in_width, in_height, + out_y, frame->width, + out_u, frame->width/2, + out_v, frame->width/2, + frame->width, frame->height, + libyuv::kFilterNone); + frame->data[0] = out_y; + frame->data[1] = out_u; + frame->data[2] = out_v; + } else { + frame->data[0] = cy; + frame->data[1] = cu; + frame->data[2] = cv; + } + frame->pts = counter*50*1000; // 50ms per frame + + int ret = counter; + + int err = avcodec_send_frame(this->codec_ctx, frame); + if (err < 0) { + LOGE("avcodec_send_frame error %d", err); + ret = -1; + } + + AVPacket pkt; + av_init_packet(&pkt); + pkt.data = NULL; + pkt.size = 0; + while (ret >= 0) { + err = avcodec_receive_packet(this->codec_ctx, &pkt); + if (err == AVERROR_EOF) { + break; + } else if (err == AVERROR(EAGAIN)) { + // Encoder might need a few frames on startup to get started. Keep going + ret = 0; + break; + } else if (err < 0) { + LOGE("avcodec_receive_packet error %d", err); + ret = -1; + break; + } + + if (env_debug_encoder) { + printf("%20s got %8d bytes flags %8x idx %4d id %8d\n", encoder_info.publish_name, pkt.size, pkt.flags, counter, extra->frame_id); + } + + publisher_publish(this, segment_num, counter, *extra, + (pkt.flags & AV_PKT_FLAG_KEY) ? V4L2_BUF_FLAG_KEYFRAME : 0, + kj::arrayPtr(pkt.data, (size_t)0), // TODO: get the header + kj::arrayPtr(pkt.data, pkt.size)); + + counter++; + } + av_packet_unref(&pkt); + return ret; +} diff --git a/system/loggerd/encoder/ffmpeg_encoder.h b/system/loggerd/encoder/ffmpeg_encoder.h new file mode 100644 index 0000000..9e45a3d --- /dev/null +++ b/system/loggerd/encoder/ffmpeg_encoder.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include + +extern "C" { +#include +#include +#include +} + +#include "system/loggerd/encoder/encoder.h" +#include "system/loggerd/loggerd.h" + +class FfmpegEncoder : public VideoEncoder { +public: + FfmpegEncoder(const EncoderInfo &encoder_info, int in_width, int in_height); + ~FfmpegEncoder(); + int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra); + void encoder_open(const char* path); + void encoder_close(); + +private: + int segment_num = -1; + int counter = 0; + bool is_open = false; + + AVCodecContext *codec_ctx; + AVFrame *frame = NULL; + std::vector convert_buf; + std::vector downscale_buf; +}; diff --git a/system/loggerd/encoder/v4l_encoder.cc b/system/loggerd/encoder/v4l_encoder.cc new file mode 100644 index 0000000..853a17a --- /dev/null +++ b/system/loggerd/encoder/v4l_encoder.cc @@ -0,0 +1,331 @@ +#include +#include +#include +#include + +#include "system/loggerd/encoder/v4l_encoder.h" +#include "common/util.h" +#include "common/timing.h" + +#include "third_party/libyuv/include/libyuv.h" +#include "third_party/linux/include/msm_media_info.h" + +// has to be in this order +#include "third_party/linux/include/v4l2-controls.h" +#include +#define V4L2_QCOM_BUF_FLAG_CODECCONFIG 0x00020000 +#define V4L2_QCOM_BUF_FLAG_EOS 0x02000000 + +/* + kernel debugging: + echo 0xff > /sys/module/videobuf2_core/parameters/debug + echo 0x7fffffff > /sys/kernel/debug/msm_vidc/debug_level + echo 0xff > /sys/devices/platform/soc/aa00000.qcom,vidc/video4linux/video33/dev_debug +*/ +const int env_debug_encoder = (getenv("DEBUG_ENCODER") != NULL) ? atoi(getenv("DEBUG_ENCODER")) : 0; + +static void checked_ioctl(int fd, unsigned long request, void *argp) { + int ret = util::safe_ioctl(fd, request, argp); + if (ret != 0) { + LOGE("checked_ioctl failed with error %d (%d %lx %p)", errno, fd, request, argp); + assert(0); + } +} + +static void dequeue_buffer(int fd, v4l2_buf_type buf_type, unsigned int *index=NULL, unsigned int *bytesused=NULL, unsigned int *flags=NULL, struct timeval *timestamp=NULL) { + v4l2_plane plane = {0}; + v4l2_buffer v4l_buf = { + .type = buf_type, + .memory = V4L2_MEMORY_USERPTR, + .m = { .planes = &plane, }, + .length = 1, + }; + checked_ioctl(fd, VIDIOC_DQBUF, &v4l_buf); + + if (index) *index = v4l_buf.index; + if (bytesused) *bytesused = v4l_buf.m.planes[0].bytesused; + if (flags) *flags = v4l_buf.flags; + if (timestamp) *timestamp = v4l_buf.timestamp; + assert(v4l_buf.m.planes[0].data_offset == 0); +} + +static void queue_buffer(int fd, v4l2_buf_type buf_type, unsigned int index, VisionBuf *buf, struct timeval timestamp={}) { + v4l2_plane plane = { + .length = (unsigned int)buf->len, + .m = { .userptr = (unsigned long)buf->addr, }, + .bytesused = (uint32_t)buf->len, + .reserved = {(unsigned int)buf->fd} + }; + + v4l2_buffer v4l_buf = { + .type = buf_type, + .index = index, + .memory = V4L2_MEMORY_USERPTR, + .m = { .planes = &plane, }, + .length = 1, + .flags = V4L2_BUF_FLAG_TIMESTAMP_COPY, + .timestamp = timestamp + }; + + checked_ioctl(fd, VIDIOC_QBUF, &v4l_buf); +} + +static void request_buffers(int fd, v4l2_buf_type buf_type, unsigned int count) { + struct v4l2_requestbuffers reqbuf = { + .type = buf_type, + .memory = V4L2_MEMORY_USERPTR, + .count = count + }; + checked_ioctl(fd, VIDIOC_REQBUFS, &reqbuf); +} + +void V4LEncoder::dequeue_handler(V4LEncoder *e) { + std::string dequeue_thread_name = "dq-"+std::string(e->encoder_info.publish_name); + util::set_thread_name(dequeue_thread_name.c_str()); + + e->segment_num++; + uint32_t idx = -1; + bool exit = false; + + // POLLIN is capture, POLLOUT is frame + struct pollfd pfd; + pfd.events = POLLIN | POLLOUT; + pfd.fd = e->fd; + + // save the header + kj::Array header; + + while (!exit) { + int rc = poll(&pfd, 1, 1000); + if (rc < 0) { + if (errno != EINTR) { + // TODO: exit encoder? + // ignore the error and keep going + LOGE("poll failed (%d - %d)", rc, errno); + } + continue; + } else if (rc == 0) { + LOGE("encoder dequeue poll timeout"); + continue; + } + + if (env_debug_encoder >= 2) { + printf("%20s poll %x at %.2f ms\n", e->encoder_info.publish_name, pfd.revents, millis_since_boot()); + } + + int frame_id = -1; + if (pfd.revents & POLLIN) { + unsigned int bytesused, flags, index; + struct timeval timestamp; + dequeue_buffer(e->fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, &index, &bytesused, &flags, ×tamp); + e->buf_out[index].sync(VISIONBUF_SYNC_FROM_DEVICE); + uint8_t *buf = (uint8_t*)e->buf_out[index].addr; + int64_t ts = timestamp.tv_sec * 1000000 + timestamp.tv_usec; + + // eof packet, we exit + if (flags & V4L2_QCOM_BUF_FLAG_EOS) { + exit = true; + } else if (flags & V4L2_QCOM_BUF_FLAG_CODECCONFIG) { + // save header + header = kj::heapArray(buf, bytesused); + } else { + VisionIpcBufExtra extra = e->extras.pop(); + assert(extra.timestamp_eof/1000 == ts); // stay in sync + frame_id = extra.frame_id; + ++idx; + e->publisher_publish(e, e->segment_num, idx, extra, flags, header, kj::arrayPtr(buf, bytesused)); + } + + if (env_debug_encoder) { + printf("%20s got(%d) %6d bytes flags %8x idx %3d/%4d id %8d ts %ld lat %.2f ms (%lu frames free)\n", + e->encoder_info.publish_name, index, bytesused, flags, e->segment_num, idx, frame_id, ts, millis_since_boot()-(ts/1000.), e->free_buf_in.size()); + } + + // requeue the buffer + queue_buffer(e->fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, index, &e->buf_out[index]); + } + + if (pfd.revents & POLLOUT) { + unsigned int index; + dequeue_buffer(e->fd, V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, &index); + e->free_buf_in.push(index); + } + } +} + +V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_height) + : VideoEncoder(encoder_info, in_width, in_height) { + fd = open("/dev/v4l/by-path/platform-aa00000.qcom_vidc-video-index1", O_RDWR|O_NONBLOCK); + assert(fd >= 0); + + struct v4l2_capability cap; + checked_ioctl(fd, VIDIOC_QUERYCAP, &cap); + LOGD("opened encoder device %s %s = %d", cap.driver, cap.card, fd); + assert(strcmp((const char *)cap.driver, "msm_vidc_driver") == 0); + assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0); + + struct v4l2_format fmt_out = { + .type = V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, + .fmt = { + .pix_mp = { + // downscales are free with v4l + .width = (unsigned int)(out_width), + .height = (unsigned int)(out_height), + .pixelformat = (encoder_info.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C) ? V4L2_PIX_FMT_HEVC : V4L2_PIX_FMT_H264, + .field = V4L2_FIELD_ANY, + .colorspace = V4L2_COLORSPACE_DEFAULT, + } + } + }; + checked_ioctl(fd, VIDIOC_S_FMT, &fmt_out); + + v4l2_streamparm streamparm = { + .type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, + .parm = { + .output = { + // TODO: more stuff here? we don't know + .timeperframe = { + .numerator = 1, + .denominator = 20 + } + } + } + }; + checked_ioctl(fd, VIDIOC_S_PARM, &streamparm); + + struct v4l2_format fmt_in = { + .type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, + .fmt = { + .pix_mp = { + .width = (unsigned int)in_width, + .height = (unsigned int)in_height, + .pixelformat = V4L2_PIX_FMT_NV12, + .field = V4L2_FIELD_ANY, + .colorspace = V4L2_COLORSPACE_470_SYSTEM_BG, + } + } + }; + checked_ioctl(fd, VIDIOC_S_FMT, &fmt_in); + + LOGD("in buffer size %d, out buffer size %d", + fmt_in.fmt.pix_mp.plane_fmt[0].sizeimage, + fmt_out.fmt.pix_mp.plane_fmt[0].sizeimage); + + // shared ctrls + { + struct v4l2_control ctrls[] = { + { .id = V4L2_CID_MPEG_VIDEO_HEADER_MODE, .value = V4L2_MPEG_VIDEO_HEADER_MODE_SEPARATE}, + { .id = V4L2_CID_MPEG_VIDEO_BITRATE, .value = encoder_info.bitrate}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_RATE_CONTROL, .value = V4L2_CID_MPEG_VIDC_VIDEO_RATE_CONTROL_VBR_CFR}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_PRIORITY, .value = V4L2_MPEG_VIDC_VIDEO_PRIORITY_REALTIME_DISABLE}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_IDR_PERIOD, .value = 1}, + }; + for (auto ctrl : ctrls) { + checked_ioctl(fd, VIDIOC_S_CTRL, &ctrl); + } + } + + if (encoder_info.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C) { + struct v4l2_control ctrls[] = { + { .id = V4L2_CID_MPEG_VIDC_VIDEO_HEVC_PROFILE, .value = V4L2_MPEG_VIDC_VIDEO_HEVC_PROFILE_MAIN}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_HEVC_TIER_LEVEL, .value = V4L2_MPEG_VIDC_VIDEO_HEVC_LEVEL_HIGH_TIER_LEVEL_5}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_VUI_TIMING_INFO, .value = V4L2_MPEG_VIDC_VIDEO_VUI_TIMING_INFO_ENABLED}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_NUM_P_FRAMES, .value = 29}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_NUM_B_FRAMES, .value = 0}, + }; + for (auto ctrl : ctrls) { + checked_ioctl(fd, VIDIOC_S_CTRL, &ctrl); + } + } else { + struct v4l2_control ctrls[] = { + { .id = V4L2_CID_MPEG_VIDEO_H264_PROFILE, .value = V4L2_MPEG_VIDEO_H264_PROFILE_HIGH}, + { .id = V4L2_CID_MPEG_VIDEO_H264_LEVEL, .value = V4L2_MPEG_VIDEO_H264_LEVEL_UNKNOWN}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_NUM_P_FRAMES, .value = 14}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_NUM_B_FRAMES, .value = 0}, + { .id = V4L2_CID_MPEG_VIDEO_H264_ENTROPY_MODE, .value = V4L2_MPEG_VIDEO_H264_ENTROPY_MODE_CABAC}, + { .id = V4L2_CID_MPEG_VIDC_VIDEO_H264_CABAC_MODEL, .value = V4L2_CID_MPEG_VIDC_VIDEO_H264_CABAC_MODEL_0}, + { .id = V4L2_CID_MPEG_VIDEO_H264_LOOP_FILTER_MODE, .value = 0}, + { .id = V4L2_CID_MPEG_VIDEO_H264_LOOP_FILTER_ALPHA, .value = 0}, + { .id = V4L2_CID_MPEG_VIDEO_H264_LOOP_FILTER_BETA, .value = 0}, + { .id = V4L2_CID_MPEG_VIDEO_MULTI_SLICE_MODE, .value = 0}, + }; + for (auto ctrl : ctrls) { + checked_ioctl(fd, VIDIOC_S_CTRL, &ctrl); + } + } + + // allocate buffers + request_buffers(fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, BUF_OUT_COUNT); + request_buffers(fd, V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, BUF_IN_COUNT); + + // start encoder + v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE; + checked_ioctl(fd, VIDIOC_STREAMON, &buf_type); + buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE; + checked_ioctl(fd, VIDIOC_STREAMON, &buf_type); + + // queue up output buffers + for (unsigned int i = 0; i < BUF_OUT_COUNT; i++) { + buf_out[i].allocate(fmt_out.fmt.pix_mp.plane_fmt[0].sizeimage); + queue_buffer(fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, i, &buf_out[i]); + } + // queue up input buffers + for (unsigned int i = 0; i < BUF_IN_COUNT; i++) { + free_buf_in.push(i); + } +} + +void V4LEncoder::encoder_open(const char* path) { + dequeue_handler_thread = std::thread(V4LEncoder::dequeue_handler, this); + this->is_open = true; + this->counter = 0; +} + +int V4LEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) { + struct timeval timestamp { + .tv_sec = (long)(extra->timestamp_eof/1000000000), + .tv_usec = (long)((extra->timestamp_eof/1000) % 1000000), + }; + + // reserve buffer + int buffer_in = free_buf_in.pop(); + + // push buffer + extras.push(*extra); + //buf->sync(VISIONBUF_SYNC_TO_DEVICE); + queue_buffer(fd, V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, buffer_in, buf, timestamp); + + return this->counter++; +} + +void V4LEncoder::encoder_close() { + if (this->is_open) { + // pop all the frames before closing, then put the buffers back + for (int i = 0; i < BUF_IN_COUNT; i++) free_buf_in.pop(); + for (int i = 0; i < BUF_IN_COUNT; i++) free_buf_in.push(i); + // no frames, stop the encoder + struct v4l2_encoder_cmd encoder_cmd = { .cmd = V4L2_ENC_CMD_STOP }; + checked_ioctl(fd, VIDIOC_ENCODER_CMD, &encoder_cmd); + // join waits for V4L2_QCOM_BUF_FLAG_EOS + dequeue_handler_thread.join(); + assert(extras.empty()); + } + this->is_open = false; +} + +V4LEncoder::~V4LEncoder() { + encoder_close(); + v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE; + checked_ioctl(fd, VIDIOC_STREAMOFF, &buf_type); + request_buffers(fd, V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, 0); + buf_type = V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE; + checked_ioctl(fd, VIDIOC_STREAMOFF, &buf_type); + request_buffers(fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, 0); + close(fd); + + for (int i = 0; i < BUF_OUT_COUNT; i++) { + if (buf_out[i].free() != 0) { + LOGE("Failed to free buffer"); + } + } +} diff --git a/system/loggerd/encoder/v4l_encoder.h b/system/loggerd/encoder/v4l_encoder.h new file mode 100644 index 0000000..9336bf3 --- /dev/null +++ b/system/loggerd/encoder/v4l_encoder.h @@ -0,0 +1,30 @@ +#pragma once + +#include "common/queue.h" +#include "system/loggerd/encoder/encoder.h" + +#define BUF_IN_COUNT 7 +#define BUF_OUT_COUNT 6 + +class V4LEncoder : public VideoEncoder { +public: + V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_height); + ~V4LEncoder(); + int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra); + void encoder_open(const char* path); + void encoder_close(); +private: + int fd; + + bool is_open = false; + int segment_num = -1; + int counter = 0; + + SafeQueue extras; + + static void dequeue_handler(V4LEncoder *e); + std::thread dequeue_handler_thread; + + VisionBuf buf_out[BUF_OUT_COUNT]; + SafeQueue free_buf_in; +}; diff --git a/system/loggerd/encoderd b/system/loggerd/encoderd deleted file mode 100755 index 8637cd6..0000000 Binary files a/system/loggerd/encoderd and /dev/null differ diff --git a/system/loggerd/encoderd.cc b/system/loggerd/encoderd.cc new file mode 100644 index 0000000..1b45df6 --- /dev/null +++ b/system/loggerd/encoderd.cc @@ -0,0 +1,161 @@ +#include + +#include "system/loggerd/loggerd.h" + +#ifdef QCOM2 +#include "system/loggerd/encoder/v4l_encoder.h" +#define Encoder V4LEncoder +#else +#include "system/loggerd/encoder/ffmpeg_encoder.h" +#define Encoder FfmpegEncoder +#endif + +ExitHandler do_exit; + +struct EncoderdState { + int max_waiting = 0; + + // Sync logic for startup + std::atomic encoders_ready = 0; + std::atomic start_frame_id = 0; + bool camera_ready[WideRoadCam + 1] = {}; + bool camera_synced[WideRoadCam + 1] = {}; +}; + +// Handle initial encoder syncing by waiting for all encoders to reach the same frame id +bool sync_encoders(EncoderdState *s, CameraType cam_type, uint32_t frame_id) { + if (s->camera_synced[cam_type]) return true; + + if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) { + // add a small margin to the start frame id in case one of the encoders already dropped the next frame + update_max_atomic(s->start_frame_id, frame_id + 2); + if (std::exchange(s->camera_ready[cam_type], true) == false) { + ++s->encoders_ready; + LOGD("camera %d encoder ready", cam_type); + } + return false; + } else { + if (s->max_waiting == 1) update_max_atomic(s->start_frame_id, frame_id); + bool synced = frame_id >= s->start_frame_id; + s->camera_synced[cam_type] = synced; + if (!synced) LOGD("camera %d waiting for frame %d, cur %d", cam_type, (int)s->start_frame_id, frame_id); + return synced; + } +} + + +void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) { + util::set_thread_name(cam_info.thread_name); + + std::vector> encoders; + VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); + + int cur_seg = 0; + while (!do_exit) { + if (!vipc_client.connect(false)) { + util::sleep_for(5); + continue; + } + + // init encoders + if (encoders.empty()) { + VisionBuf buf_info = vipc_client.buffers[0]; + LOGW("encoder %s init %zux%zu", cam_info.thread_name, buf_info.width, buf_info.height); + assert(buf_info.width > 0 && buf_info.height > 0); + + for (const auto &encoder_info : cam_info.encoder_infos) { + auto &e = encoders.emplace_back(new Encoder(encoder_info, buf_info.width, buf_info.height)); + e->encoder_open(nullptr); + } + } + + bool lagging = false; + while (!do_exit) { + VisionIpcBufExtra extra; + VisionBuf* buf = vipc_client.recv(&extra); + if (buf == nullptr) continue; + + // detect loop around and drop the frames + if (buf->get_frame_id() != extra.frame_id) { + if (!lagging) { + LOGE("encoder %s lag buffer id: %" PRIu64 " extra id: %d", cam_info.thread_name, buf->get_frame_id(), extra.frame_id); + lagging = true; + } + continue; + } + lagging = false; + + if (!sync_encoders(s, cam_info.type, extra.frame_id)) { + continue; + } + if (do_exit) break; + + // do rotation if required + const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; + if (cur_seg >= 0 && extra.frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) { + for (auto &e : encoders) { + e->encoder_close(); + e->encoder_open(NULL); + } + ++cur_seg; + } + + // encode a frame + for (int i = 0; i < encoders.size(); ++i) { + int out_id = encoders[i]->encode_frame(buf, &extra); + + if (out_id == -1) { + LOGE("Failed to encode frame. frame_id: %d", extra.frame_id); + } + } + } + } +} + +template +void encoderd_thread(const LogCameraInfo (&cameras)[N]) { + EncoderdState s; + + std::set streams; + while (!do_exit) { + streams = VisionIpcClient::getAvailableStreams("camerad", false); + if (!streams.empty()) { + break; + } + util::sleep_for(100); + } + + if (!streams.empty()) { + std::vector encoder_threads; + for (auto stream : streams) { + auto it = std::find_if(std::begin(cameras), std::end(cameras), + [stream](auto &cam) { return cam.stream_type == stream; }); + assert(it != std::end(cameras)); + ++s.max_waiting; + encoder_threads.push_back(std::thread(encoder_thread, &s, *it)); + } + + for (auto &t : encoder_threads) t.join(); + } +} + +int main(int argc, char* argv[]) { + if (!Hardware::PC()) { + int ret; + ret = util::set_realtime_priority(52); + assert(ret == 0); + ret = util::set_core_affinity({3}); + assert(ret == 0); + } + if (argc > 1) { + std::string arg1(argv[1]); + if (arg1 == "--stream") { + encoderd_thread(stream_cameras_logged); + } else { + LOGE("Argument '%s' is not supported", arg1.c_str()); + } + } else { + encoderd_thread(cameras_logged); + } + return 0; +} diff --git a/system/loggerd/logger.cc b/system/loggerd/logger.cc new file mode 100644 index 0000000..f1b187d --- /dev/null +++ b/system/loggerd/logger.cc @@ -0,0 +1,163 @@ +#include "system/loggerd/logger.h" + +#include +#include +#include +#include +#include +#include + +#include "common/params.h" +#include "common/swaglog.h" +#include "common/version.h" + +// ***** log metadata ***** +kj::Array logger_build_init_data() { + uint64_t wall_time = nanos_since_epoch(); + + MessageBuilder msg; + auto init = msg.initEvent().initInitData(); + + init.setWallTimeNanos(wall_time); + init.setVersion(COMMA_VERSION); + init.setDirty(!getenv("CLEAN")); + init.setDeviceType(Hardware::get_device_type()); + + // log kernel args + std::ifstream cmdline_stream("/proc/cmdline"); + std::vector kernel_args; + std::string buf; + while (cmdline_stream >> buf) { + kernel_args.push_back(buf); + } + + auto lkernel_args = init.initKernelArgs(kernel_args.size()); + for (int i=0; i params_map = params.readAll(); + + init.setGitCommit(params_map["GitCommit"]); + init.setGitCommitDate(params_map["GitCommitDate"]); + init.setGitBranch(params_map["GitBranch"]); + init.setGitRemote(params_map["GitRemote"]); + init.setPassive(false); + init.setDongleId(params_map["DongleId"]); + + auto lparams = init.initParams().initEntries(params_map.size()); + int j = 0; + for (auto& [key, value] : params_map) { + auto lentry = lparams[j]; + lentry.setKey(key); + if ( !(params.getKeyType(key) & DONT_LOG) ) { + lentry.setValue(capnp::Data::Reader((const kj::byte*)value.data(), value.size())); + } + j++; + } + + // log commands + std::vector log_commands = { + "df -h", // usage for all filesystems + }; + + auto hw_logs = Hardware::get_init_logs(); + + auto commands = init.initCommands().initEntries(log_commands.size() + hw_logs.size()); + for (int i = 0; i < log_commands.size(); i++) { + auto lentry = commands[i]; + + lentry.setKey(log_commands[i]); + + const std::string result = util::check_output(log_commands[i]); + lentry.setValue(capnp::Data::Reader((const kj::byte*)result.data(), result.size())); + } + + int i = log_commands.size(); + for (auto &[key, value] : hw_logs) { + auto lentry = commands[i]; + lentry.setKey(key); + lentry.setValue(capnp::Data::Reader((const kj::byte*)value.data(), value.size())); + i++; + } + + return capnp::messageToFlatArray(msg); +} + +std::string logger_get_identifier(std::string key) { + // a log identifier is a 32 bit counter, plus a 10 character unique ID. + // e.g. 000001a3--c20ba54385 + + Params params; + uint32_t cnt; + try { + cnt = std::stol(params.get(key)); + } catch (std::exception &e) { + cnt = 0; + } + params.put(key, std::to_string(cnt + 1)); + + std::stringstream ss; + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0, 15); + for (int i = 0; i < 10; ++i) { + ss << std::hex << dist(mt); + } + + return util::string_format("%08x--%s", cnt, ss.str().c_str()); +} + +static void log_sentinel(LoggerState *log, SentinelType type, int eixt_signal = 0) { + MessageBuilder msg; + auto sen = msg.initEvent().initSentinel(); + sen.setType(type); + sen.setSignal(eixt_signal); + log->write(msg.toBytes(), true); +} + +LoggerState::LoggerState(const std::string &log_root) { + route_name = logger_get_identifier("RouteCount"); + route_path = log_root + "/" + route_name; + init_data = logger_build_init_data(); +} + +LoggerState::~LoggerState() { + if (rlog) { + log_sentinel(this, SentinelType::END_OF_ROUTE, exit_signal); + std::remove(lock_file.c_str()); + } +} + +bool LoggerState::next() { + if (rlog) { + log_sentinel(this, SentinelType::END_OF_SEGMENT); + std::remove(lock_file.c_str()); + } + + segment_path = route_path + "--" + std::to_string(++part); + bool ret = util::create_directories(segment_path, 0775); + assert(ret == true); + + const std::string rlog_path = segment_path + "/rlog"; + lock_file = rlog_path + ".lock"; + std::ofstream{lock_file}; + + rlog.reset(new RawFile(rlog_path)); + qlog.reset(new RawFile(segment_path + "/qlog")); + + // log init data & sentinel type. + write(init_data.asBytes(), true); + log_sentinel(this, part > 0 ? SentinelType::START_OF_SEGMENT : SentinelType::START_OF_ROUTE); + return true; +} + +void LoggerState::write(uint8_t* data, size_t size, bool in_qlog) { + rlog->write(data, size); + if (in_qlog) qlog->write(data, size); +} diff --git a/system/loggerd/logger.h b/system/loggerd/logger.h new file mode 100644 index 0000000..7a8490d --- /dev/null +++ b/system/loggerd/logger.h @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include + +#include "cereal/messaging/messaging.h" +#include "common/util.h" +#include "system/hardware/hw.h" + +class RawFile { + public: + RawFile(const std::string &path) { + file = util::safe_fopen(path.c_str(), "wb"); + assert(file != nullptr); + } + ~RawFile() { + util::safe_fflush(file); + int err = fclose(file); + assert(err == 0); + } + inline void write(void* data, size_t size) { + int written = util::safe_fwrite(data, 1, size, file); + assert(written == size); + } + inline void write(kj::ArrayPtr array) { write(array.begin(), array.size()); } + + private: + FILE* file = nullptr; +}; + +typedef cereal::Sentinel::SentinelType SentinelType; + + +class LoggerState { +public: + LoggerState(const std::string& log_root = Path::log_root()); + ~LoggerState(); + bool next(); + void write(uint8_t* data, size_t size, bool in_qlog); + inline int segment() const { return part; } + inline const std::string& segmentPath() const { return segment_path; } + inline const std::string& routeName() const { return route_name; } + inline void write(kj::ArrayPtr bytes, bool in_qlog) { write(bytes.begin(), bytes.size(), in_qlog); } + inline void setExitSignal(int signal) { exit_signal = signal; } + +protected: + int part = -1, exit_signal = 0; + std::string route_path, route_name, segment_path, lock_file; + kj::Array init_data; + std::unique_ptr rlog, qlog; +}; + +kj::Array logger_build_init_data(); +std::string logger_get_identifier(std::string key); diff --git a/system/loggerd/loggerd b/system/loggerd/loggerd deleted file mode 100755 index 76d96af..0000000 Binary files a/system/loggerd/loggerd and /dev/null differ diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc new file mode 100644 index 0000000..3c0ffc1 --- /dev/null +++ b/system/loggerd/loggerd.cc @@ -0,0 +1,313 @@ +#include + +#include +#include +#include +#include +#include + +#include "common/params.h" +#include "system/loggerd/encoder/encoder.h" +#include "system/loggerd/loggerd.h" +#include "system/loggerd/video_writer.h" + +ExitHandler do_exit; + +struct LoggerdState { + LoggerState logger; + std::atomic last_camera_seen_tms; + std::atomic ready_to_rotate; // count of encoders ready to rotate + int max_waiting = 0; + double last_rotate_tms = 0.; // last rotate time in ms +}; + +void logger_rotate(LoggerdState *s) { + bool ret =s->logger.next(); + assert(ret); + s->ready_to_rotate = 0; + s->last_rotate_tms = millis_since_boot(); + LOGW((s->logger.segment() == 0) ? "logging to %s" : "rotated to %s", s->logger.segmentPath().c_str()); +} + +void rotate_if_needed(LoggerdState *s) { + // all encoders ready, trigger rotation + bool all_ready = s->ready_to_rotate == s->max_waiting; + + // fallback logic to prevent extremely long segments in the case of camera, encoder, etc. malfunctions + bool timed_out = false; + double tms = millis_since_boot(); + double seg_length_secs = (tms - s->last_rotate_tms) / 1000.; + if ((seg_length_secs > SEGMENT_LENGTH) && !LOGGERD_TEST) { + // TODO: might be nice to put these reasons in the sentinel + if ((tms - s->last_camera_seen_tms) > NO_CAMERA_PATIENCE) { + timed_out = true; + LOGE("no camera packets seen. auto rotating"); + } else if (seg_length_secs > SEGMENT_LENGTH*1.2) { + timed_out = true; + LOGE("segment too long. auto rotating"); + } + } + + if (all_ready || timed_out) { + logger_rotate(s); + } +} + +struct RemoteEncoder { + std::unique_ptr writer; + int encoderd_segment_offset; + int current_segment = -1; + std::vector q; + int dropped_frames = 0; + bool recording = false; + bool marked_ready_to_rotate = false; + bool seen_first_packet = false; +}; + +int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { + int bytes_count = 0; + + // extract the message + capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word))); + auto event = cmsg.getRoot(); + auto edata = (event.*(encoder_info.get_encode_data_func))(); + auto idx = edata.getIdx(); + auto flags = idx.getFlags(); + + // encoderd can have started long before loggerd + if (!re.seen_first_packet) { + re.seen_first_packet = true; + re.encoderd_segment_offset = idx.getSegmentNum(); + LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset); + } + int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset; + + if (offset_segment_num == s->logger.segment()) { + // loggerd is now on the segment that matches this packet + + // if this is a new segment, we close any possible old segments, move to the new, and process any queued packets + if (re.current_segment != s->logger.segment()) { + if (re.recording) { + re.writer.reset(); + re.recording = false; + } + re.current_segment = s->logger.segment(); + re.marked_ready_to_rotate = false; + // we are in this segment now, process any queued messages before this one + if (!re.q.empty()) { + for (auto &qmsg : re.q) { + bytes_count += handle_encoder_msg(s, qmsg, name, re, encoder_info); + } + re.q.clear(); + } + } + + // if we aren't recording yet, try to start, since we are in the correct segment + if (!re.recording) { + if (flags & V4L2_BUF_FLAG_KEYFRAME) { + // only create on iframe + if (re.dropped_frames) { + // this should only happen for the first segment, maybe + LOGW("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames); + re.dropped_frames = 0; + } + // if we aren't actually recording, don't create the writer + if (encoder_info.record) { + assert(encoder_info.filename != NULL); + re.writer.reset(new VideoWriter(s->logger.segmentPath().c_str(), + encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, + edata.getWidth(), edata.getHeight(), encoder_info.fps, idx.getType())); + // write the header + auto header = edata.getHeader(); + re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false); + } + re.recording = true; + } else { + // this is a sad case when we aren't recording, but don't have an iframe + // nothing we can do but drop the frame + delete msg; + ++re.dropped_frames; + return bytes_count; + } + } + + // we have to be recording if we are here + assert(re.recording); + + // if we are actually writing the video file, do so + if (re.writer) { + auto data = edata.getData(); + re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME); + } + + // put it in log stream as the idx packet + MessageBuilder bmsg; + auto evt = bmsg.initEvent(event.getValid()); + evt.setLogMonoTime(event.getLogMonoTime()); + (evt.*(encoder_info.set_encode_idx_func))(idx); + auto new_msg = bmsg.toBytes(); + s->logger.write((uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog? + bytes_count += new_msg.size(); + + // free the message, we used it + delete msg; + } else if (offset_segment_num > s->logger.segment()) { + // encoderd packet has a newer segment, this means encoderd has rolled over + if (!re.marked_ready_to_rotate) { + re.marked_ready_to_rotate = true; + ++s->ready_to_rotate; + LOGD("rotate %d -> %d ready %d/%d for %s", + s->logger.segment(), offset_segment_num, + s->ready_to_rotate.load(), s->max_waiting, name.c_str()); + } + // queue up all the new segment messages, they go in after the rotate + re.q.push_back(msg); + } else { + LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d", + name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset); + // free the message, it's useless. this should never happen + // actually, this can happen if you restart encoderd + re.encoderd_segment_offset = -s->logger.segment(); + delete msg; + } + + return bytes_count; +} + +void handle_user_flag(LoggerdState *s) { + static int prev_segment = -1; + if (s->logger.segment() == prev_segment) return; + + LOGW("preserving %s", s->logger.segmentPath().c_str()); + +#ifdef __APPLE__ + int ret = setxattr(s->logger.segmentPath().c_str(), PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0, 0); +#else + int ret = setxattr(s->logger.segmentPath().c_str(), PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0); +#endif + if (ret) { + LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->logger.segmentPath().c_str(), strerror(errno)); + } + + // mark route for uploading + Params params; + std::string routes = Params().get("AthenadRecentlyViewedRoutes"); + params.put("AthenadRecentlyViewedRoutes", routes + "," + s->logger.routeName()); + + prev_segment = s->logger.segment(); +} + +void loggerd_thread() { + // setup messaging + typedef struct ServiceState { + std::string name; + int counter, freq; + bool encoder, user_flag; + } ServiceState; + std::unordered_map service_state; + std::unordered_map remote_encoders; + + std::unique_ptr ctx(Context::create()); + std::unique_ptr poller(Poller::create()); + + // subscribe to all socks + for (const auto& [_, it] : services) { + const bool encoder = util::ends_with(it.name, "EncodeData"); + const bool livestream_encoder = util::starts_with(it.name, "livestream"); + if (!it.should_log && (!encoder || livestream_encoder)) continue; + LOGD("logging %s (on port %d)", it.name.c_str(), it.port); + + SubSocket * sock = SubSocket::create(ctx.get(), it.name); + assert(sock != NULL); + poller->registerSocket(sock); + service_state[sock] = { + .name = it.name, + .counter = 0, + .freq = it.decimation, + .encoder = encoder, + .user_flag = it.name == "userFlag", + }; + } + + LoggerdState s; + // init logger + logger_rotate(&s); + Params().put("CurrentRoute", s.logger.routeName()); + + std::map encoder_infos_dict; + for (const auto &cam : cameras_logged) { + for (const auto &encoder_info : cam.encoder_infos) { + encoder_infos_dict[encoder_info.publish_name] = encoder_info; + s.max_waiting++; + } + } + + uint64_t msg_count = 0, bytes_count = 0; + double start_ts = millis_since_boot(); + while (!do_exit) { + // poll for new messages on all sockets + for (auto sock : poller->poll(1000)) { + if (do_exit) break; + + ServiceState &service = service_state[sock]; + if (service.user_flag) { + handle_user_flag(&s); + } + + // drain socket + int count = 0; + Message *msg = nullptr; + while (!do_exit && (msg = sock->receive(true))) { + const bool in_qlog = service.freq != -1 && (service.counter++ % service.freq == 0); + if (service.encoder) { + s.last_camera_seen_tms = millis_since_boot(); + bytes_count += handle_encoder_msg(&s, msg, service.name, remote_encoders[sock], encoder_infos_dict[service.name]); + } else { + s.logger.write((uint8_t *)msg->getData(), msg->getSize(), in_qlog); + bytes_count += msg->getSize(); + delete msg; + } + + rotate_if_needed(&s); + + if ((++msg_count % 1000) == 0) { + double seconds = (millis_since_boot() - start_ts) / 1000.0; + LOGD("%" PRIu64 " messages, %.2f msg/sec, %.2f KB/sec", msg_count, msg_count / seconds, bytes_count * 0.001 / seconds); + } + + count++; + if (count >= 200) { + LOGD("large volume of '%s' messages", service.name.c_str()); + break; + } + } + } + } + + LOGW("closing logger"); + s.logger.setExitSignal(do_exit.signal); + + if (do_exit.power_failure) { + LOGE("power failure"); + sync(); + LOGE("sync done"); + } + + // messaging cleanup + for (auto &[sock, service] : service_state) delete sock; +} + +int main(int argc, char** argv) { + if (!Hardware::PC()) { + int ret; + ret = util::set_core_affinity({0, 1, 2, 3}); + assert(ret == 0); + // TODO: why does this impact camerad timings? + //ret = util::set_realtime_priority(1); + //assert(ret == 0); + } + + loggerd_thread(); + + return 0; +} diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h new file mode 100644 index 0000000..5476148 --- /dev/null +++ b/system/loggerd/loggerd.h @@ -0,0 +1,154 @@ +#pragma once + +#include + +#include "cereal/messaging/messaging.h" +#include "cereal/services.h" +#include "cereal/visionipc/visionipc_client.h" +#include "system/camerad/cameras/camera_common.h" +#include "system/hardware/hw.h" +#include "common/params.h" +#include "common/swaglog.h" +#include "common/util.h" + +#include "system/loggerd/logger.h" + +constexpr int MAIN_FPS = 20; +const int MAIN_BITRATE = Params().getBool("HigherBitrate") ? 20000000 : 1e7; +const int LIVESTREAM_BITRATE = 1e6; +const int QCAM_BITRATE = 256000; + +#define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead + +#define INIT_ENCODE_FUNCTIONS(encode_type) \ + .get_encode_data_func = &cereal::Event::Reader::get##encode_type##Data, \ + .set_encode_idx_func = &cereal::Event::Builder::set##encode_type##Idx, \ + .init_encode_data_func = &cereal::Event::Builder::init##encode_type##Data + +const bool LOGGERD_TEST = getenv("LOGGERD_TEST"); +const int SEGMENT_LENGTH = LOGGERD_TEST ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60; + +constexpr char PRESERVE_ATTR_NAME[] = "user.preserve"; +constexpr char PRESERVE_ATTR_VALUE = '1'; +class EncoderInfo { +public: + const char *publish_name; + const char *filename = NULL; + bool record = true; + int frame_width = -1; + int frame_height = -1; + int fps = MAIN_FPS; + int bitrate = MAIN_BITRATE; + cereal::EncodeIndex::Type encode_type = Hardware::PC() ? cereal::EncodeIndex::Type::BIG_BOX_LOSSLESS + : cereal::EncodeIndex::Type::FULL_H_E_V_C; + ::cereal::EncodeData::Reader (cereal::Event::Reader::*get_encode_data_func)() const; + void (cereal::Event::Builder::*set_encode_idx_func)(::cereal::EncodeIndex::Reader); + cereal::EncodeData::Builder (cereal::Event::Builder::*init_encode_data_func)(); +}; + +class LogCameraInfo { +public: + const char *thread_name; + int fps = MAIN_FPS; + CameraType type; + VisionStreamType stream_type; + std::vector encoder_infos; +}; + +const EncoderInfo main_road_encoder_info = { + .publish_name = "roadEncodeData", + .filename = "fcamera.hevc", + INIT_ENCODE_FUNCTIONS(RoadEncode), +}; + +const EncoderInfo main_wide_road_encoder_info = { + .publish_name = "wideRoadEncodeData", + .filename = "ecamera.hevc", + INIT_ENCODE_FUNCTIONS(WideRoadEncode), +}; + +const EncoderInfo main_driver_encoder_info = { + .publish_name = "driverEncodeData", + .filename = "dcamera.hevc", + .record = Params().getBool("RecordFront"), + INIT_ENCODE_FUNCTIONS(DriverEncode), +}; + +const EncoderInfo stream_road_encoder_info = { + .publish_name = "livestreamRoadEncodeData", + .encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, + .record = false, + .bitrate = LIVESTREAM_BITRATE, + INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode), +}; + +const EncoderInfo stream_wide_road_encoder_info = { + .publish_name = "livestreamWideRoadEncodeData", + .encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, + .record = false, + .bitrate = LIVESTREAM_BITRATE, + INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode), +}; + +const EncoderInfo stream_driver_encoder_info = { + .publish_name = "livestreamDriverEncodeData", + .encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, + .record = false, + .bitrate = LIVESTREAM_BITRATE, + INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode), +}; + +const EncoderInfo qcam_encoder_info = { + .publish_name = "qRoadEncodeData", + .filename = "qcamera.ts", + .bitrate = QCAM_BITRATE, + .encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, + .frame_width = 526, + .frame_height = 330, + INIT_ENCODE_FUNCTIONS(QRoadEncode), +}; + +const LogCameraInfo road_camera_info{ + .thread_name = "road_cam_encoder", + .type = RoadCam, + .stream_type = VISION_STREAM_ROAD, + .encoder_infos = {main_road_encoder_info, qcam_encoder_info} +}; + +const LogCameraInfo wide_road_camera_info{ + .thread_name = "wide_road_cam_encoder", + .type = WideRoadCam, + .stream_type = VISION_STREAM_WIDE_ROAD, + .encoder_infos = {main_wide_road_encoder_info} +}; + +const LogCameraInfo driver_camera_info{ + .thread_name = "driver_cam_encoder", + .type = DriverCam, + .stream_type = VISION_STREAM_DRIVER, + .encoder_infos = {main_driver_encoder_info} +}; + +const LogCameraInfo stream_road_camera_info{ + .thread_name = "road_cam_encoder", + .type = RoadCam, + .stream_type = VISION_STREAM_ROAD, + .encoder_infos = {stream_road_encoder_info} +}; + +const LogCameraInfo stream_wide_road_camera_info{ + .thread_name = "wide_road_cam_encoder", + .type = WideRoadCam, + .stream_type = VISION_STREAM_WIDE_ROAD, + .encoder_infos = {stream_wide_road_encoder_info} +}; + +const LogCameraInfo stream_driver_camera_info{ + .thread_name = "driver_cam_encoder", + .type = DriverCam, + .stream_type = VISION_STREAM_DRIVER, + .encoder_infos = {stream_driver_encoder_info} +}; + +const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; +const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info}; diff --git a/system/loggerd/tests/__init__.py b/system/loggerd/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/system/loggerd/tests/loggerd_tests_common.py b/system/loggerd/tests/loggerd_tests_common.py new file mode 100644 index 0000000..877c872 --- /dev/null +++ b/system/loggerd/tests/loggerd_tests_common.py @@ -0,0 +1,91 @@ +import os +import random +import unittest +from pathlib import Path + + +import openpilot.system.loggerd.deleter as deleter +import openpilot.system.loggerd.uploader as uploader +from openpilot.common.params import Params +from openpilot.system.hardware.hw import Paths +from openpilot.system.loggerd.xattr_cache import setxattr + + +def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: bytes = None) -> None: + file_path.parent.mkdir(parents=True, exist_ok=True) + + if lock: + lock_path = str(file_path) + ".lock" + os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL)) + + chunks = 128 + chunk_bytes = int(size_mb * 1024 * 1024 / chunks) + data = os.urandom(chunk_bytes) + + with open(file_path, "wb") as f: + for _ in range(chunks): + f.write(data) + + if upload_xattr is not None: + setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr) + +class MockResponse(): + def __init__(self, text, status_code): + self.text = text + self.status_code = status_code + +class MockApi(): + def __init__(self, dongle_id): + pass + + def get(self, *args, **kwargs): + return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200) + + def get_token(self): + return "fake-token" + +class MockApiIgnore(): + def __init__(self, dongle_id): + pass + + def get(self, *args, **kwargs): + return MockResponse('', 412) + + def get_token(self): + return "fake-token" + +class UploaderTestCase(unittest.TestCase): + f_type = "UNKNOWN" + + root: Path + seg_num: int + seg_format: str + seg_format2: str + seg_dir: str + + def set_ignore(self): + uploader.Api = MockApiIgnore + + def setUp(self): + uploader.Api = MockApi + uploader.fake_upload = True + uploader.force_wifi = True + uploader.allow_sleep = False + self.seg_num = random.randint(1, 300) + self.seg_format = "00000004--0ac3964c96--{}" + self.seg_format2 = "00000005--4c4e99b08b--{}" + self.seg_dir = self.seg_format.format(self.seg_num) + + self.params = Params() + self.params.put("IsOffroad", "1") + self.params.put("DongleId", "0000000000000000") + + def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, + upload_xattr: bytes = None, preserve_xattr: bytes = None) -> Path: + file_path = Path(Paths.log_root()) / f_dir / fn + create_random_file(file_path, size_mb, lock, upload_xattr) + + if preserve_xattr is not None: + setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr) + + return file_path diff --git a/system/loggerd/tests/test_deleter.py b/system/loggerd/tests/test_deleter.py new file mode 100644 index 0000000..37d2550 --- /dev/null +++ b/system/loggerd/tests/test_deleter.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +import time +import threading +import unittest +from collections import namedtuple +from pathlib import Path +from collections.abc import Sequence + +import openpilot.system.loggerd.deleter as deleter +from openpilot.common.timeout import Timeout, TimeoutException +from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase + +Stats = namedtuple("Stats", ['f_bavail', 'f_blocks', 'f_frsize']) + + +class TestDeleter(UploaderTestCase): + def fake_statvfs(self, d): + return self.fake_stats + + def setUp(self): + self.f_type = "fcamera.hevc" + super().setUp() + self.fake_stats = Stats(f_bavail=0, f_blocks=10, f_frsize=4096) + deleter.os.statvfs = self.fake_statvfs + + def start_thread(self): + self.end_event = threading.Event() + self.del_thread = threading.Thread(target=deleter.deleter_thread, args=[self.end_event]) + self.del_thread.daemon = True + self.del_thread.start() + + def join_thread(self): + self.end_event.set() + self.del_thread.join() + + def test_delete(self): + f_path = self.make_file_with_data(self.seg_dir, self.f_type, 1) + + self.start_thread() + + try: + with Timeout(2, "Timeout waiting for file to be deleted"): + while f_path.exists(): + time.sleep(0.01) + finally: + self.join_thread() + + def assertDeleteOrder(self, f_paths: Sequence[Path], timeout: int = 5) -> None: + deleted_order = [] + + self.start_thread() + try: + with Timeout(timeout, "Timeout waiting for files to be deleted"): + while True: + for f in f_paths: + if not f.exists() and f not in deleted_order: + deleted_order.append(f) + if len(deleted_order) == len(f_paths): + break + time.sleep(0.01) + except TimeoutException: + print("Not deleted:", [f for f in f_paths if f not in deleted_order]) + raise + finally: + self.join_thread() + + self.assertEqual(deleted_order, f_paths, "Files not deleted in expected order") + + def test_delete_order(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(1), self.f_type), + self.make_file_with_data(self.seg_format2.format(0), self.f_type), + ]) + + def test_delete_many_preserved(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(1), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE), + self.make_file_with_data(self.seg_format.format(2), self.f_type), + ] + [ + self.make_file_with_data(self.seg_format2.format(i), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE) + for i in range(5) + ]) + + def test_delete_last(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(1), self.f_type), + self.make_file_with_data(self.seg_format2.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(0), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE), + self.make_file_with_data("boot", self.seg_format[:-4]), + self.make_file_with_data("crash", self.seg_format2[:-4]), + ]) + + def test_no_delete_when_available_space(self): + f_path = self.make_file_with_data(self.seg_dir, self.f_type) + + block_size = 4096 + available = (10 * 1024 * 1024 * 1024) / block_size # 10GB free + self.fake_stats = Stats(f_bavail=available, f_blocks=10, f_frsize=block_size) + + self.start_thread() + start_time = time.monotonic() + while f_path.exists() and time.monotonic() - start_time < 2: + time.sleep(0.01) + self.join_thread() + + self.assertTrue(f_path.exists(), "File deleted with available space") + + def test_no_delete_with_lock_file(self): + f_path = self.make_file_with_data(self.seg_dir, self.f_type, lock=True) + + self.start_thread() + start_time = time.monotonic() + while f_path.exists() and time.monotonic() - start_time < 2: + time.sleep(0.01) + self.join_thread() + + self.assertTrue(f_path.exists(), "File deleted when locked") + + +if __name__ == "__main__": + unittest.main() diff --git a/system/loggerd/tests/test_encoder.py b/system/loggerd/tests/test_encoder.py new file mode 100644 index 0000000..bd076dc --- /dev/null +++ b/system/loggerd/tests/test_encoder.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +import math +import os +import pytest +import random +import shutil +import subprocess +import time +import unittest +from pathlib import Path + +from parameterized import parameterized +from tqdm import trange + +from openpilot.common.params import Params +from openpilot.common.timeout import Timeout +from openpilot.system.hardware import TICI +from openpilot.selfdrive.manager.process_config import managed_processes +from openpilot.tools.lib.logreader import LogReader +from openpilot.system.hardware.hw import Paths + +SEGMENT_LENGTH = 2 +FULL_SIZE = 2507572 +CAMERAS = [ + ("fcamera.hevc", 20, FULL_SIZE, "roadEncodeIdx"), + ("dcamera.hevc", 20, FULL_SIZE, "driverEncodeIdx"), + ("ecamera.hevc", 20, FULL_SIZE, "wideRoadEncodeIdx"), + ("qcamera.ts", 20, 130000, None), +] + +# we check frame count, so we don't have to be too strict on size +FILE_SIZE_TOLERANCE = 0.5 + + +@pytest.mark.tici # TODO: all of loggerd should work on PC +class TestEncoder(unittest.TestCase): + + def setUp(self): + self._clear_logs() + os.environ["LOGGERD_TEST"] = "1" + os.environ["LOGGERD_SEGMENT_LENGTH"] = str(SEGMENT_LENGTH) + + def tearDown(self): + self._clear_logs() + + def _clear_logs(self): + if os.path.exists(Paths.log_root()): + shutil.rmtree(Paths.log_root()) + + def _get_latest_segment_path(self): + last_route = sorted(Path(Paths.log_root()).iterdir())[-1] + return os.path.join(Paths.log_root(), last_route) + + # TODO: this should run faster than real time + @parameterized.expand([(True, ), (False, )]) + def test_log_rotation(self, record_front): + Params().put_bool("RecordFront", record_front) + + managed_processes['sensord'].start() + managed_processes['loggerd'].start() + managed_processes['encoderd'].start() + + time.sleep(1.0) + managed_processes['camerad'].start() + + num_segments = int(os.getenv("SEGMENTS", random.randint(10, 15))) + + # wait for loggerd to make the dir for first segment + route_prefix_path = None + with Timeout(int(SEGMENT_LENGTH*3)): + while route_prefix_path is None: + try: + route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0] + except Exception: + time.sleep(0.1) + + def check_seg(i): + # check each camera file size + counts = [] + first_frames = [] + for camera, fps, size, encode_idx_name in CAMERAS: + if not record_front and "dcamera" in camera: + continue + + file_path = f"{route_prefix_path}--{i}/{camera}" + + # check file exists + self.assertTrue(os.path.exists(file_path), f"segment #{i}: '{file_path}' missing") + + # TODO: this ffprobe call is really slow + # check frame count + cmd = f"ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 {file_path}" + if TICI: + cmd = "LD_LIBRARY_PATH=/usr/local/lib " + cmd + + expected_frames = fps * SEGMENT_LENGTH + probe = subprocess.check_output(cmd, shell=True, encoding='utf8') + frame_count = int(probe.split('\n')[0].strip()) + counts.append(frame_count) + + self.assertEqual(frame_count, expected_frames, + f"segment #{i}: {camera} failed frame count check: expected {expected_frames}, got {frame_count}") + + # sanity check file size + file_size = os.path.getsize(file_path) + self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE), + f"{file_path} size {file_size} isn't close to target size {size}") + + # Check encodeIdx + if encode_idx_name is not None: + rlog_path = f"{route_prefix_path}--{i}/rlog" + msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name] + encode_msgs = [getattr(m, encode_idx_name) for m in msgs] + + valid = [m.valid for m in msgs] + segment_idxs = [m.segmentId for m in encode_msgs] + encode_idxs = [m.encodeId for m in encode_msgs] + frame_idxs = [m.frameId for m in encode_msgs] + + # Check frame count + self.assertEqual(frame_count, len(segment_idxs)) + self.assertEqual(frame_count, len(encode_idxs)) + + # Check for duplicates or skips + self.assertEqual(0, segment_idxs[0]) + self.assertEqual(len(set(segment_idxs)), len(segment_idxs)) + + self.assertTrue(all(valid)) + + self.assertEqual(expected_frames * i, encode_idxs[0]) + first_frames.append(frame_idxs[0]) + self.assertEqual(len(set(encode_idxs)), len(encode_idxs)) + + self.assertEqual(1, len(set(first_frames))) + + if TICI: + expected_frames = fps * SEGMENT_LENGTH + self.assertEqual(min(counts), expected_frames) + shutil.rmtree(f"{route_prefix_path}--{i}") + + try: + for i in trange(num_segments): + # poll for next segment + with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"): + while Path(f"{route_prefix_path}--{i+1}") not in Path(Paths.log_root()).iterdir(): + time.sleep(0.1) + check_seg(i) + finally: + managed_processes['loggerd'].stop() + managed_processes['encoderd'].stop() + managed_processes['camerad'].stop() + managed_processes['sensord'].stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/system/loggerd/tests/test_logger.cc b/system/loggerd/tests/test_logger.cc new file mode 100644 index 0000000..2dae136 --- /dev/null +++ b/system/loggerd/tests/test_logger.cc @@ -0,0 +1,74 @@ +#include "catch2/catch.hpp" +#include "system/loggerd/logger.h" + +typedef cereal::Sentinel::SentinelType SentinelType; + +void verify_segment(const std::string &route_path, int segment, int max_segment, int required_event_cnt) { + const std::string segment_path = route_path + "--" + std::to_string(segment); + SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT; + SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT; + + REQUIRE(!util::file_exists(segment_path + "/rlog.lock")); + for (const char *fn : {"/rlog", "/qlog"}) { + const std::string log_file = segment_path + fn; + std::string log = util::read_file(log_file); + REQUIRE(!log.empty()); + int event_cnt = 0, i = 0; + kj::ArrayPtr words((capnp::word *)log.data(), log.size() / sizeof(capnp::word)); + while (words.size() > 0) { + try { + capnp::FlatArrayMessageReader reader(words); + auto event = reader.getRoot(); + words = kj::arrayPtr(reader.getEnd(), words.end()); + if (i == 0) { + REQUIRE(event.which() == cereal::Event::INIT_DATA); + } else if (i == 1) { + REQUIRE(event.which() == cereal::Event::SENTINEL); + REQUIRE(event.getSentinel().getType() == begin_sentinel); + REQUIRE(event.getSentinel().getSignal() == 0); + } else if (words.size() > 0) { + REQUIRE(event.which() == cereal::Event::CLOCKS); + ++event_cnt; + } else { + // the last event must be SENTINEL + REQUIRE(event.which() == cereal::Event::SENTINEL); + REQUIRE(event.getSentinel().getType() == end_sentinel); + REQUIRE(event.getSentinel().getSignal() == (end_sentinel == SentinelType::END_OF_ROUTE ? 1 : 0)); + } + ++i; + } catch (const kj::Exception &ex) { + INFO("failed parse " << i << " exception :" << ex.getDescription()); + REQUIRE(0); + break; + } + } + REQUIRE(event_cnt == required_event_cnt); + } +} + +void write_msg(LoggerState *logger) { + MessageBuilder msg; + msg.initEvent().initClocks(); + logger->write(msg.toBytes(), true); +} + +TEST_CASE("logger") { + const int segment_cnt = 100; + const std::string log_root = "/tmp/test_logger"; + system(("rm " + log_root + " -rf").c_str()); + std::string route_name; + { + LoggerState logger(log_root); + route_name = logger.routeName(); + for (int i = 0; i < segment_cnt; ++i) { + REQUIRE(logger.next()); + REQUIRE(util::file_exists(logger.segmentPath() + "/rlog.lock")); + REQUIRE(logger.segment() == i); + write_msg(&logger); + } + logger.setExitSignal(1); + } + for (int i = 0; i < segment_cnt; ++i) { + verify_segment(log_root + "/" + route_name, i, segment_cnt, 1); + } +} diff --git a/system/loggerd/tests/test_loggerd.py b/system/loggerd/tests/test_loggerd.py new file mode 100644 index 0000000..fdea60a --- /dev/null +++ b/system/loggerd/tests/test_loggerd.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +import numpy as np +import os +import re +import random +import string +import subprocess +import time +from collections import defaultdict +from pathlib import Path +from flaky import flaky + +import cereal.messaging as messaging +from cereal import log +from cereal.services import SERVICE_LIST +from openpilot.common.basedir import BASEDIR +from openpilot.common.params import Params +from openpilot.common.timeout import Timeout +from openpilot.system.hardware.hw import Paths +from openpilot.system.loggerd.xattr_cache import getxattr +from openpilot.system.loggerd.deleter import PRESERVE_ATTR_NAME, PRESERVE_ATTR_VALUE +from openpilot.selfdrive.manager.process_config import managed_processes +from openpilot.system.version import get_version +from openpilot.tools.lib.helpers import RE +from openpilot.tools.lib.logreader import LogReader +from cereal.visionipc import VisionIpcServer, VisionStreamType +from openpilot.common.transformations.camera import DEVICE_CAMERAS + +SentinelType = log.Sentinel.SentinelType + +CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in SERVICE_LIST + and SERVICE_LIST[f].should_log and "encode" not in f.lower()] + + +class TestLoggerd: + def _get_latest_log_dir(self): + log_dirs = sorted(Path(Paths.log_root()).iterdir(), key=lambda f: f.stat().st_mtime) + return log_dirs[-1] + + def _get_log_dir(self, x): + for l in x.splitlines(): + for p in l.split(' '): + path = Path(p.strip()) + if path.is_dir(): + return path + return None + + def _get_log_fn(self, x): + for l in x.splitlines(): + for p in l.split(' '): + path = Path(p.strip()) + if path.is_file(): + return path + return None + + def _gen_bootlog(self): + with Timeout(5): + out = subprocess.check_output("./bootlog", cwd=os.path.join(BASEDIR, "system/loggerd"), encoding='utf-8') + + log_fn = self._get_log_fn(out) + + # check existence + assert log_fn is not None + + return log_fn + + def _check_init_data(self, msgs): + msg = msgs[0] + assert msg.which() == 'initData' + + def _check_sentinel(self, msgs, route): + start_type = SentinelType.startOfRoute if route else SentinelType.startOfSegment + assert msgs[1].sentinel.type == start_type + + end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment + assert msgs[-1].sentinel.type == end_type + + def _publish_random_messages(self, services: list[str]) -> dict[str, list]: + pm = messaging.PubMaster(services) + + managed_processes["loggerd"].start() + for s in services: + assert pm.wait_for_readers_to_update(s, timeout=5) + + sent_msgs = defaultdict(list) + for _ in range(random.randint(2, 10) * 100): + for s in services: + try: + m = messaging.new_message(s) + except Exception: + m = messaging.new_message(s, random.randint(2, 10)) + pm.send(s, m) + sent_msgs[s].append(m) + + for s in services: + assert pm.wait_for_readers_to_update(s, timeout=5) + managed_processes["loggerd"].stop() + + return sent_msgs + + def test_init_data_values(self): + os.environ["CLEAN"] = random.choice(["0", "1"]) + + dongle = ''.join(random.choice(string.printable) for n in range(random.randint(1, 100))) + fake_params = [ + # param, initData field, value + ("DongleId", "dongleId", dongle), + ("GitCommit", "gitCommit", "commit"), + ("GitCommitDate", "gitCommitDate", "date"), + ("GitBranch", "gitBranch", "branch"), + ("GitRemote", "gitRemote", "remote"), + ] + params = Params() + for k, _, v in fake_params: + params.put(k, v) + params.put("AccessToken", "abc") + + lr = list(LogReader(str(self._gen_bootlog()))) + initData = lr[0].initData + + assert initData.dirty != bool(os.environ["CLEAN"]) + assert initData.version == get_version() + + if os.path.isfile("/proc/cmdline"): + with open("/proc/cmdline") as f: + assert list(initData.kernelArgs) == f.read().strip().split(" ") + + with open("/proc/version") as f: + assert initData.kernelVersion == f.read() + + # check params + logged_params = {entry.key: entry.value for entry in initData.params.entries} + expected_params = {k for k, _, __ in fake_params} | {'AccessToken', 'BootCount'} + assert set(logged_params.keys()) == expected_params, set(logged_params.keys()) ^ expected_params + assert logged_params['AccessToken'] == b'', f"DONT_LOG param value was logged: {repr(logged_params['AccessToken'])}" + for param_key, initData_key, v in fake_params: + assert getattr(initData, initData_key) == v + assert logged_params[param_key].decode() == v + + @flaky(max_runs=3) + def test_rotation(self): + os.environ["LOGGERD_TEST"] = "1" + Params().put("RecordFront", "1") + + d = DEVICE_CAMERAS[("tici", "ar0231")] + expected_files = {"rlog", "qlog", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"} + streams = [(VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048*2346, 2048, 2048*1216), "roadCameraState"), + (VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048*2346, 2048, 2048*1216), "driverCameraState"), + (VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048*2346, 2048, 2048*1216), "wideRoadCameraState")] + + pm = messaging.PubMaster(["roadCameraState", "driverCameraState", "wideRoadCameraState"]) + vipc_server = VisionIpcServer("camerad") + for stream_type, frame_spec, _ in streams: + vipc_server.create_buffers_with_sizes(stream_type, 40, False, *(frame_spec)) + vipc_server.start_listener() + + num_segs = random.randint(2, 5) + length = random.randint(1, 3) + os.environ["LOGGERD_SEGMENT_LENGTH"] = str(length) + managed_processes["loggerd"].start() + managed_processes["encoderd"].start() + assert pm.wait_for_readers_to_update("roadCameraState", timeout=5) + + fps = 20.0 + for n in range(1, int(num_segs*length*fps)+1): + for stream_type, frame_spec, state in streams: + dat = np.empty(frame_spec[2], dtype=np.uint8) + vipc_server.send(stream_type, dat[:].flatten().tobytes(), n, n/fps, n/fps) + + camera_state = messaging.new_message(state) + frame = getattr(camera_state, state) + frame.frameId = n + pm.send(state, camera_state) + + for _, _, state in streams: + assert pm.wait_for_readers_to_update(state, timeout=5, dt=0.001) + + managed_processes["loggerd"].stop() + managed_processes["encoderd"].stop() + + route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0] + for n in range(num_segs): + p = Path(f"{route_path}--{n}") + logged = {f.name for f in p.iterdir() if f.is_file()} + diff = logged ^ expected_files + assert len(diff) == 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}" + + def test_bootlog(self): + # generate bootlog with fake launch log + launch_log = ''.join(str(random.choice(string.printable)) for _ in range(100)) + with open("/tmp/launch_log", "w") as f: + f.write(launch_log) + + bootlog_path = self._gen_bootlog() + lr = list(LogReader(str(bootlog_path))) + + # check length + assert len(lr) == 2 # boot + initData + + self._check_init_data(lr) + + # check msgs + bootlog_msgs = [m for m in lr if m.which() == 'boot'] + assert len(bootlog_msgs) == 1 + + # sanity check values + boot = bootlog_msgs.pop().boot + assert abs(boot.wallTimeNanos - time.time_ns()) < 5*1e9 # within 5s + assert boot.launchLog == launch_log + + for fn in ["console-ramoops", "pmsg-ramoops-0"]: + path = Path(os.path.join("/sys/fs/pstore/", fn)) + if path.is_file(): + with open(path, "rb") as f: + expected_val = f.read() + bootlog_val = [e.value for e in boot.pstore.entries if e.key == fn][0] + assert expected_val == bootlog_val + + # next one should increment by one + bl1 = re.match(RE.LOG_ID_V2, bootlog_path.name) + bl2 = re.match(RE.LOG_ID_V2, self._gen_bootlog().name) + assert bl1.group('uid') != bl2.group('uid') + assert int(bl1.group('count')) == 0 and int(bl2.group('count')) == 1 + + def test_qlog(self): + qlog_services = [s for s in CEREAL_SERVICES if SERVICE_LIST[s].decimation is not None] + no_qlog_services = [s for s in CEREAL_SERVICES if SERVICE_LIST[s].decimation is None] + + services = random.sample(qlog_services, random.randint(2, min(10, len(qlog_services)))) + \ + random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services)))) + sent_msgs = self._publish_random_messages(services) + + qlog_path = os.path.join(self._get_latest_log_dir(), "qlog") + lr = list(LogReader(qlog_path)) + + # check initData and sentinel + self._check_init_data(lr) + self._check_sentinel(lr, True) + + recv_msgs = defaultdict(list) + for m in lr: + recv_msgs[m.which()].append(m) + + for s, msgs in sent_msgs.items(): + recv_cnt = len(recv_msgs[s]) + + if s in no_qlog_services: + # check services with no specific decimation aren't in qlog + assert recv_cnt == 0, f"got {recv_cnt} {s} msgs in qlog" + else: + # check logged message count matches decimation + expected_cnt = (len(msgs) - 1) // SERVICE_LIST[s].decimation + 1 + assert recv_cnt == expected_cnt, f"expected {expected_cnt} msgs for {s}, got {recv_cnt}" + + def test_rlog(self): + services = random.sample(CEREAL_SERVICES, random.randint(5, 10)) + sent_msgs = self._publish_random_messages(services) + + lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog"))) + + # check initData and sentinel + self._check_init_data(lr) + self._check_sentinel(lr, True) + + # check all messages were logged and in order + lr = lr[2:-1] # slice off initData and both sentinels + for m in lr: + sent = sent_msgs[m.which()].pop(0) + sent.clear_write_flag() + assert sent.to_bytes() == m.as_builder().to_bytes() + + def test_preserving_flagged_segments(self): + services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) | {"userFlag"} + self._publish_random_messages(services) + + segment_dir = self._get_latest_log_dir() + assert getxattr(segment_dir, PRESERVE_ATTR_NAME) == PRESERVE_ATTR_VALUE + + def test_not_preserving_unflagged_segments(self): + services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) - {"userFlag"} + self._publish_random_messages(services) + + segment_dir = self._get_latest_log_dir() + assert getxattr(segment_dir, PRESERVE_ATTR_NAME) is None + diff --git a/system/loggerd/tests/test_runner.cc b/system/loggerd/tests/test_runner.cc new file mode 100644 index 0000000..62bf747 --- /dev/null +++ b/system/loggerd/tests/test_runner.cc @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include "catch2/catch.hpp" diff --git a/system/loggerd/tests/test_uploader.py b/system/loggerd/tests/test_uploader.py new file mode 100644 index 0000000..73917a3 --- /dev/null +++ b/system/loggerd/tests/test_uploader.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +import os +import time +import threading +import unittest +import logging +import json +from pathlib import Path +from openpilot.system.hardware.hw import Paths + +from openpilot.common.swaglog import cloudlog +from openpilot.system.loggerd.uploader import main, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE + +from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase + + +class FakeLogHandler(logging.Handler): + def __init__(self): + logging.Handler.__init__(self) + self.reset() + + def reset(self): + self.upload_order = list() + self.upload_ignored = list() + + def emit(self, record): + try: + j = json.loads(record.getMessage()) + if j["event"] == "upload_success": + self.upload_order.append(j["key"]) + if j["event"] == "upload_ignored": + self.upload_ignored.append(j["key"]) + except Exception: + pass + +log_handler = FakeLogHandler() +cloudlog.addHandler(log_handler) + + +class TestUploader(UploaderTestCase): + def setUp(self): + super().setUp() + log_handler.reset() + + def start_thread(self): + self.end_event = threading.Event() + self.up_thread = threading.Thread(target=main, args=[self.end_event]) + self.up_thread.daemon = True + self.up_thread.start() + + def join_thread(self): + self.end_event.set() + self.up_thread.join() + + def gen_files(self, lock=False, xattr: bytes = None, boot=True) -> list[Path]: + f_paths = [] + for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]: + f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, upload_xattr=xattr)) + + if boot: + f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock, upload_xattr=xattr)) + return f_paths + + def gen_order(self, seg1: list[int], seg2: list[int], boot=True) -> list[str]: + keys = [] + if boot: + keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1] + keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2] + keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1] + keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2] + return keys + + def test_upload(self): + self.gen_files(lock=False) + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + exp_order = self.gen_order([self.seg_num], []) + + self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored") + self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") + self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") + for f_path in exp_order: + self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") + + self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") + + def test_upload_with_wrong_xattr(self): + self.gen_files(lock=False, xattr=b'0') + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + exp_order = self.gen_order([self.seg_num], []) + + self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored") + self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") + self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") + for f_path in exp_order: + self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") + + self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") + + def test_upload_ignored(self): + self.set_ignore() + self.gen_files(lock=False) + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + exp_order = self.gen_order([self.seg_num], []) + + self.assertTrue(len(log_handler.upload_order) == 0, "Some files were not ignored") + self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore") + self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice") + for f_path in exp_order: + self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not ignored") + + self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order") + + def test_upload_files_in_create_order(self): + seg1_nums = [0, 1, 2, 10, 20] + for i in seg1_nums: + self.seg_dir = self.seg_format.format(i) + self.gen_files(boot=False) + seg2_nums = [5, 50, 51] + for i in seg2_nums: + self.seg_dir = self.seg_format2.format(i) + self.gen_files(boot=False) + + exp_order = self.gen_order(seg1_nums, seg2_nums, boot=False) + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored") + self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") + self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") + for f_path in exp_order: + self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") + + self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") + + def test_no_upload_with_lock_file(self): + self.start_thread() + + time.sleep(0.25) + f_paths = self.gen_files(lock=True, boot=False) + + # allow enough time that files should have been uploaded if they would be uploaded + time.sleep(5) + self.join_thread() + + for f_path in f_paths: + fn = f_path.with_suffix(f_path.suffix.replace(".bz2", "")) + uploaded = UPLOAD_ATTR_NAME in os.listxattr(fn) and os.getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE + self.assertFalse(uploaded, "File upload when locked") + + def test_no_upload_with_xattr(self): + self.gen_files(lock=False, xattr=UPLOAD_ATTR_VALUE) + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + self.assertEqual(len(log_handler.upload_order), 0, "File uploaded again") + + def test_clear_locks_on_startup(self): + f_paths = self.gen_files(lock=True, boot=False) + self.start_thread() + time.sleep(1) + self.join_thread() + + for f_path in f_paths: + lock_path = f_path.with_suffix(f_path.suffix + ".lock") + self.assertFalse(lock_path.is_file(), "File lock not cleared on startup") + + +if __name__ == "__main__": + unittest.main() diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index 5c8f253..cbe4069 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -9,10 +9,12 @@ import threading import time import traceback import datetime -from typing import BinaryIO, Iterator, List, Optional, Tuple +from typing import BinaryIO +from collections.abc import Iterator from cereal import log import cereal.messaging as messaging +import openpilot.selfdrive.sentry as sentry from openpilot.common.api import Api from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity @@ -42,10 +44,12 @@ class FakeResponse: self.request = FakeRequest() -def get_directory_sort(d: str) -> List[str]: - return [s.rjust(10, '0') for s in d.rsplit('--', 1)] +def get_directory_sort(d: str) -> list[str]: + # ensure old format is sorted sooner + o = ["0", ] if d.startswith("2024-") else ["1", ] + return o + [s.rjust(10, '0') for s in d.rsplit('--', 1)] -def listdir_by_creation(d: str) -> List[str]: +def listdir_by_creation(d: str) -> list[str]: if not os.path.isdir(d): return [] @@ -82,7 +86,7 @@ class Uploader: self.immediate_folders = ["crash/", "boot/"] self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} - def list_upload_files(self, metered: bool) -> Iterator[Tuple[str, str, str]]: + def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]: r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8") requested_routes = [] if r is None else r.split(",") @@ -121,7 +125,7 @@ class Uploader: yield name, key, fn - def next_file_to_upload(self, metered: bool) -> Optional[Tuple[str, str, str]]: + def next_file_to_upload(self, metered: bool) -> tuple[str, str, str] | None: upload_files = list(self.list_upload_files(metered)) for name, key, fn in upload_files: @@ -207,7 +211,7 @@ class Uploader: return success - def step(self, network_type: int, metered: bool) -> Optional[bool]: + def step(self, network_type: int, metered: bool) -> bool | None: d = self.next_file_to_upload(metered) if d is None: return None @@ -221,7 +225,7 @@ class Uploader: return self.upload(name, key, fn, network_type, metered) -def main(exit_event: Optional[threading.Event] = None) -> None: +def main(exit_event: threading.Event = None) -> None: if exit_event is None: exit_event = threading.Event() @@ -247,7 +251,9 @@ def main(exit_event: Optional[threading.Event] = None) -> None: sm.update(0) offroad = params.get_bool("IsOffroad") network_type = sm['deviceState'].networkType if not force_wifi else NetworkType.wifi - if network_type == NetworkType.none: + at_home = not params.get_bool("DisableOnroadUploads") or offroad and network_type in (NetworkType.ethernet, NetworkType.wifi) + openpilot_crashed = os.path.isfile(os.path.join(sentry.CRASHES_DIR, 'error.txt')) + if network_type == NetworkType.none or not at_home or openpilot_crashed: if allow_sleep: time.sleep(60 if offroad else 5) continue diff --git a/system/loggerd/video_writer.cc b/system/loggerd/video_writer.cc new file mode 100644 index 0000000..90b5f1a --- /dev/null +++ b/system/loggerd/video_writer.cc @@ -0,0 +1,112 @@ +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#include + +#include "system/loggerd/video_writer.h" +#include "common/swaglog.h" +#include "common/util.h" + +VideoWriter::VideoWriter(const char *path, const char *filename, bool remuxing, int width, int height, int fps, cereal::EncodeIndex::Type codec) + : remuxing(remuxing) { + vid_path = util::string_format("%s/%s", path, filename); + lock_path = util::string_format("%s/%s.lock", path, filename); + + int lock_fd = HANDLE_EINTR(open(lock_path.c_str(), O_RDWR | O_CREAT, 0664)); + assert(lock_fd >= 0); + close(lock_fd); + + LOGD("encoder_open %s remuxing:%d", this->vid_path.c_str(), this->remuxing); + if (this->remuxing) { + bool raw = (codec == cereal::EncodeIndex::Type::BIG_BOX_LOSSLESS); + avformat_alloc_output_context2(&this->ofmt_ctx, NULL, raw ? "matroska" : NULL, this->vid_path.c_str()); + assert(this->ofmt_ctx); + + // set codec correctly. needed? + assert(codec != cereal::EncodeIndex::Type::FULL_H_E_V_C); + const AVCodec *avcodec = avcodec_find_encoder(raw ? AV_CODEC_ID_FFVHUFF : AV_CODEC_ID_H264); + assert(avcodec); + + this->codec_ctx = avcodec_alloc_context3(avcodec); + assert(this->codec_ctx); + this->codec_ctx->width = width; + this->codec_ctx->height = height; + this->codec_ctx->pix_fmt = AV_PIX_FMT_YUV420P; + this->codec_ctx->time_base = (AVRational){ 1, fps }; + + if (codec == cereal::EncodeIndex::Type::BIG_BOX_LOSSLESS) { + // without this, there's just noise + int err = avcodec_open2(this->codec_ctx, avcodec, NULL); + assert(err >= 0); + } + + this->out_stream = avformat_new_stream(this->ofmt_ctx, raw ? avcodec : NULL); + assert(this->out_stream); + + int err = avio_open(&this->ofmt_ctx->pb, this->vid_path.c_str(), AVIO_FLAG_WRITE); + assert(err >= 0); + + } else { + this->of = util::safe_fopen(this->vid_path.c_str(), "wb"); + assert(this->of); + } +} + +void VideoWriter::write(uint8_t *data, int len, long long timestamp, bool codecconfig, bool keyframe) { + if (of && data) { + size_t written = util::safe_fwrite(data, 1, len, of); + if (written != len) { + LOGE("failed to write file.errno=%d", errno); + } + } + + if (remuxing) { + if (codecconfig) { + if (len > 0) { + codec_ctx->extradata = (uint8_t*)av_mallocz(len + AV_INPUT_BUFFER_PADDING_SIZE); + codec_ctx->extradata_size = len; + memcpy(codec_ctx->extradata, data, len); + } + int err = avcodec_parameters_from_context(out_stream->codecpar, codec_ctx); + assert(err >= 0); + err = avformat_write_header(ofmt_ctx, NULL); + assert(err >= 0); + } else { + // input timestamps are in microseconds + AVRational in_timebase = {1, 1000000}; + + AVPacket pkt; + av_init_packet(&pkt); + pkt.data = data; + pkt.size = len; + + enum AVRounding rnd = static_cast(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.pts = pkt.dts = av_rescale_q_rnd(timestamp, in_timebase, ofmt_ctx->streams[0]->time_base, rnd); + pkt.duration = av_rescale_q(50*1000, in_timebase, ofmt_ctx->streams[0]->time_base); + + if (keyframe) { + pkt.flags |= AV_PKT_FLAG_KEY; + } + + // TODO: can use av_write_frame for non raw? + int err = av_interleaved_write_frame(ofmt_ctx, &pkt); + if (err < 0) { LOGW("ts encoder write issue len: %d ts: %lld", len, timestamp); } + + av_packet_unref(&pkt); + } + } +} + +VideoWriter::~VideoWriter() { + if (this->remuxing) { + int err = av_write_trailer(this->ofmt_ctx); + if (err != 0) LOGE("av_write_trailer failed %d", err); + avcodec_free_context(&this->codec_ctx); + err = avio_closep(&this->ofmt_ctx->pb); + if (err != 0) LOGE("avio_closep failed %d", err); + avformat_free_context(this->ofmt_ctx); + } else { + util::safe_fflush(this->of); + fclose(this->of); + this->of = nullptr; + } + unlink(this->lock_path.c_str()); +} diff --git a/system/loggerd/video_writer.h b/system/loggerd/video_writer.h new file mode 100644 index 0000000..1aa758b --- /dev/null +++ b/system/loggerd/video_writer.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +extern "C" { +#include +#include +} + +#include "cereal/messaging/messaging.h" + +class VideoWriter { +public: + VideoWriter(const char *path, const char *filename, bool remuxing, int width, int height, int fps, cereal::EncodeIndex::Type codec); + void write(uint8_t *data, int len, long long timestamp, bool codecconfig, bool keyframe); + ~VideoWriter(); +private: + std::string vid_path, lock_path; + FILE *of = nullptr; + + AVCodecContext *codec_ctx; + AVFormatContext *ofmt_ctx; + AVStream *out_stream; + bool remuxing; +}; diff --git a/system/loggerd/xattr_cache.py b/system/loggerd/xattr_cache.py index 5feeff3..d322011 100644 --- a/system/loggerd/xattr_cache.py +++ b/system/loggerd/xattr_cache.py @@ -1,10 +1,9 @@ import os import errno -from typing import Dict, Optional, Tuple -_cached_attributes: Dict[Tuple, Optional[bytes]] = {} +_cached_attributes: dict[tuple, bytes | None] = {} -def getxattr(path: str, attr_name: str) -> Optional[bytes]: +def getxattr(path: str, attr_name: str) -> bytes | None: key = (path, attr_name) if key not in _cached_attributes: try: