wip
This commit is contained in:
30
system/loggerd/README.md
Normal file
30
system/loggerd/README.md
Normal file
@@ -0,0 +1,30 @@
|
||||
# loggerd
|
||||
|
||||
openpilot records routes in one minute chunks called segments. A route starts on the rising edge of ignition and ends on the falling edge.
|
||||
|
||||
Check out our [python library](https://github.com/commaai/openpilot/blob/master/tools/lib/logreader.py) for reading openpilot logs. Also checkout our [tools](https://github.com/commaai/openpilot/tree/master/tools) to replay and view your data. These are the same tools we use to debug and develop openpilot.
|
||||
|
||||
## log types
|
||||
|
||||
For each segment, openpilot records the following log types:
|
||||
|
||||
## rlog.bz2
|
||||
|
||||
rlogs contain all the messages passed amongst openpilot's processes. See [cereal/services.py](https://github.com/commaai/cereal/blob/master/services.py) for a list of all the logged services. They're a bzip2 archive of the serialized capnproto messages.
|
||||
|
||||
## {f,e,d}camera.hevc
|
||||
|
||||
Each camera stream is H.265 encoded and written to its respective file.
|
||||
* fcamera.hevc is the road camera
|
||||
* ecamera.hevc is the wide road camera
|
||||
* dcamera.hevc is the driver camera
|
||||
|
||||
## qlog.bz2 & qcamera.ts
|
||||
|
||||
qlogs are a decimated subset of the rlogs. Check out [cereal/services.py](https://github.com/commaai/cereal/blob/master/services.py) for the decimation.
|
||||
|
||||
|
||||
qcameras are H.264 encoded, lower res versions of the fcamera.hevc. The video shown in [comma connect](https://connect.comma.ai/) is from the qcameras.
|
||||
|
||||
|
||||
qlogs and qcameras are designed to be small enough to upload instantly on slow internet and store forever, yet useful enough for most analysis and debugging.
|
||||
27
system/loggerd/SConscript
Normal file
27
system/loggerd/SConscript
Normal file
@@ -0,0 +1,27 @@
|
||||
Import('env', 'arch', 'cereal', 'messaging', 'common', 'visionipc')
|
||||
|
||||
libs = [common, cereal, messaging, visionipc,
|
||||
'zmq', 'capnp', 'kj', 'z',
|
||||
'avformat', 'avcodec', 'swscale', 'avutil',
|
||||
'yuv', 'OpenCL', 'pthread']
|
||||
|
||||
src = ['logger.cc', 'video_writer.cc', 'encoder/encoder.cc', 'encoder/v4l_encoder.cc']
|
||||
if arch != "larch64":
|
||||
src += ['encoder/ffmpeg_encoder.cc']
|
||||
|
||||
if arch == "Darwin":
|
||||
# fix OpenCL
|
||||
del libs[libs.index('OpenCL')]
|
||||
env['FRAMEWORKS'] = ['OpenCL']
|
||||
# exclude v4l
|
||||
del src[src.index('encoder/v4l_encoder.cc')]
|
||||
|
||||
logger_lib = env.Library('logger', src)
|
||||
libs.insert(0, logger_lib)
|
||||
|
||||
env.Program('loggerd', ['loggerd.cc'], LIBS=libs)
|
||||
env.Program('encoderd', ['encoderd.cc'], LIBS=libs)
|
||||
env.Program('bootlog.cc', LIBS=libs)
|
||||
|
||||
if GetOption('extras'):
|
||||
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=libs + ['curl', 'crypto'])
|
||||
Binary file not shown.
70
system/loggerd/bootlog.cc
Normal file
70
system/loggerd/bootlog.cc
Normal file
@@ -0,0 +1,70 @@
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
|
||||
#include "cereal/messaging/messaging.h"
|
||||
#include "common/params.h"
|
||||
#include "common/swaglog.h"
|
||||
#include "system/loggerd/logger.h"
|
||||
|
||||
|
||||
static kj::Array<capnp::word> build_boot_log() {
|
||||
MessageBuilder msg;
|
||||
auto boot = msg.initEvent().initBoot();
|
||||
|
||||
boot.setWallTimeNanos(nanos_since_epoch());
|
||||
|
||||
std::string pstore = "/sys/fs/pstore";
|
||||
std::map<std::string, std::string> pstore_map = util::read_files_in_dir(pstore);
|
||||
|
||||
int i = 0;
|
||||
auto lpstore = boot.initPstore().initEntries(pstore_map.size());
|
||||
for (auto& kv : pstore_map) {
|
||||
auto lentry = lpstore[i];
|
||||
lentry.setKey(kv.first);
|
||||
lentry.setValue(capnp::Data::Reader((const kj::byte*)kv.second.data(), kv.second.size()));
|
||||
i++;
|
||||
}
|
||||
|
||||
// Gather output of commands
|
||||
std::vector<std::string> bootlog_commands = {
|
||||
"[ -x \"$(command -v journalctl)\" ] && journalctl",
|
||||
};
|
||||
|
||||
if (Hardware::TICI()) {
|
||||
bootlog_commands.push_back("[ -e /dev/nvme0 ] && sudo nvme smart-log --output-format=json /dev/nvme0");
|
||||
}
|
||||
|
||||
auto commands = boot.initCommands().initEntries(bootlog_commands.size());
|
||||
for (int j = 0; j < bootlog_commands.size(); j++) {
|
||||
auto lentry = commands[j];
|
||||
|
||||
lentry.setKey(bootlog_commands[j]);
|
||||
|
||||
const std::string result = util::check_output(bootlog_commands[j]);
|
||||
lentry.setValue(capnp::Data::Reader((const kj::byte*)result.data(), result.size()));
|
||||
}
|
||||
|
||||
boot.setLaunchLog(util::read_file("/tmp/launch_log"));
|
||||
return capnp::messageToFlatArray(msg);
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
const std::string id = logger_get_identifier("BootCount");
|
||||
const std::string path = Path::log_root() + "/boot/" + id;
|
||||
LOGW("bootlog to %s", path.c_str());
|
||||
|
||||
// Open bootlog
|
||||
bool r = util::create_directories(Path::log_root() + "/boot/", 0775);
|
||||
assert(r);
|
||||
|
||||
RawFile file(path.c_str());
|
||||
// Write initdata
|
||||
file.write(logger_build_init_data().asBytes());
|
||||
// Write bootlog
|
||||
file.write(build_boot_log().asBytes());
|
||||
|
||||
// Write out bootlog param to match routes with bootlog
|
||||
Params().put("CurrentBootlog", id.c_str());
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -32,7 +32,7 @@ def get_used_bytes(default=None):
|
||||
try:
|
||||
statvfs = os.statvfs(Paths.log_root())
|
||||
total_bytes = statvfs.f_blocks * statvfs.f_frsize
|
||||
available_bytes = statvfs.f_bavail * statvfs.f_frsize
|
||||
available_bytes = get_available_bytes(default)
|
||||
used_bytes = total_bytes - available_bytes
|
||||
except OSError:
|
||||
used_bytes = default
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
import os
|
||||
import shutil
|
||||
import threading
|
||||
from typing import List
|
||||
from openpilot.system.hardware.hw import Paths
|
||||
from openpilot.common.swaglog import cloudlog
|
||||
from openpilot.system.loggerd.config import get_available_bytes, get_available_percent
|
||||
@@ -23,7 +22,7 @@ def has_preserve_xattr(d: str) -> bool:
|
||||
return getxattr(os.path.join(Paths.log_root(), d), PRESERVE_ATTR_NAME) == PRESERVE_ATTR_VALUE
|
||||
|
||||
|
||||
def get_preserved_segments(dirs_by_creation: List[str]) -> List[str]:
|
||||
def get_preserved_segments(dirs_by_creation: list[str]) -> list[str]:
|
||||
preserved = []
|
||||
for n, d in enumerate(filter(has_preserve_xattr, reversed(dirs_by_creation))):
|
||||
if n == PRESERVE_COUNT:
|
||||
|
||||
43
system/loggerd/encoder/encoder.cc
Normal file
43
system/loggerd/encoder/encoder.cc
Normal file
@@ -0,0 +1,43 @@
|
||||
#include "system/loggerd/encoder/encoder.h"
|
||||
|
||||
VideoEncoder::VideoEncoder(const EncoderInfo &encoder_info, int in_width, int in_height)
|
||||
: encoder_info(encoder_info), in_width(in_width), in_height(in_height) {
|
||||
|
||||
out_width = encoder_info.frame_width > 0 ? encoder_info.frame_width : in_width;
|
||||
out_height = encoder_info.frame_height > 0 ? encoder_info.frame_height : in_height;
|
||||
|
||||
pm.reset(new PubMaster({encoder_info.publish_name}));
|
||||
}
|
||||
|
||||
void VideoEncoder::publisher_publish(VideoEncoder *e, int segment_num, uint32_t idx, VisionIpcBufExtra &extra,
|
||||
unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat) {
|
||||
// broadcast packet
|
||||
MessageBuilder msg;
|
||||
auto event = msg.initEvent(true);
|
||||
auto edat = (event.*(e->encoder_info.init_encode_data_func))();
|
||||
auto edata = edat.initIdx();
|
||||
struct timespec ts;
|
||||
timespec_get(&ts, TIME_UTC);
|
||||
edat.setUnixTimestampNanos((uint64_t)ts.tv_sec*1000000000 + ts.tv_nsec);
|
||||
edata.setFrameId(extra.frame_id);
|
||||
edata.setTimestampSof(extra.timestamp_sof);
|
||||
edata.setTimestampEof(extra.timestamp_eof);
|
||||
edata.setType(e->encoder_info.encode_type);
|
||||
edata.setEncodeId(e->cnt++);
|
||||
edata.setSegmentNum(segment_num);
|
||||
edata.setSegmentId(idx);
|
||||
edata.setFlags(flags);
|
||||
edata.setLen(dat.size());
|
||||
edat.setData(dat);
|
||||
edat.setWidth(out_width);
|
||||
edat.setHeight(out_height);
|
||||
if (flags & V4L2_BUF_FLAG_KEYFRAME) edat.setHeader(header);
|
||||
|
||||
uint32_t bytes_size = capnp::computeSerializedSizeInWords(msg) * sizeof(capnp::word);
|
||||
if (e->msg_cache.size() < bytes_size) {
|
||||
e->msg_cache.resize(bytes_size);
|
||||
}
|
||||
kj::ArrayOutputStream output_stream(kj::ArrayPtr<capnp::byte>(e->msg_cache.data(), bytes_size));
|
||||
capnp::writeMessage(output_stream, msg);
|
||||
e->pm->send(e->encoder_info.publish_name, e->msg_cache.data(), bytes_size);
|
||||
}
|
||||
37
system/loggerd/encoder/encoder.h
Normal file
37
system/loggerd/encoder/encoder.h
Normal file
@@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "cereal/messaging/messaging.h"
|
||||
#include "cereal/visionipc/visionipc.h"
|
||||
#include "common/queue.h"
|
||||
#include "system/camerad/cameras/camera_common.h"
|
||||
#include "system/loggerd/loggerd.h"
|
||||
|
||||
#define V4L2_BUF_FLAG_KEYFRAME 8
|
||||
|
||||
class VideoEncoder {
|
||||
public:
|
||||
VideoEncoder(const EncoderInfo &encoder_info, int in_width, int in_height);
|
||||
virtual ~VideoEncoder() {}
|
||||
virtual int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) = 0;
|
||||
virtual void encoder_open(const char* path) = 0;
|
||||
virtual void encoder_close() = 0;
|
||||
|
||||
void publisher_publish(VideoEncoder *e, int segment_num, uint32_t idx, VisionIpcBufExtra &extra, unsigned int flags, kj::ArrayPtr<capnp::byte> header, kj::ArrayPtr<capnp::byte> dat);
|
||||
|
||||
protected:
|
||||
int in_width, in_height;
|
||||
int out_width, out_height;
|
||||
const EncoderInfo encoder_info;
|
||||
|
||||
private:
|
||||
// total frames encoded
|
||||
int cnt = 0;
|
||||
std::unique_ptr<PubMaster> pm;
|
||||
std::vector<capnp::byte> msg_cache;
|
||||
};
|
||||
150
system/loggerd/encoder/ffmpeg_encoder.cc
Normal file
150
system/loggerd/encoder/ffmpeg_encoder.cc
Normal file
@@ -0,0 +1,150 @@
|
||||
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
|
||||
|
||||
#include "system/loggerd/encoder/ffmpeg_encoder.h"
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <cassert>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
|
||||
#define __STDC_CONSTANT_MACROS
|
||||
|
||||
#include "third_party/libyuv/include/libyuv.h"
|
||||
|
||||
extern "C" {
|
||||
#include <libavcodec/avcodec.h>
|
||||
#include <libavformat/avformat.h>
|
||||
#include <libavutil/imgutils.h>
|
||||
}
|
||||
|
||||
#include "common/swaglog.h"
|
||||
#include "common/util.h"
|
||||
|
||||
const int env_debug_encoder = (getenv("DEBUG_ENCODER") != NULL) ? atoi(getenv("DEBUG_ENCODER")) : 0;
|
||||
|
||||
FfmpegEncoder::FfmpegEncoder(const EncoderInfo &encoder_info, int in_width, int in_height)
|
||||
: VideoEncoder(encoder_info, in_width, in_height) {
|
||||
frame = av_frame_alloc();
|
||||
assert(frame);
|
||||
frame->format = AV_PIX_FMT_YUV420P;
|
||||
frame->width = out_width;
|
||||
frame->height = out_height;
|
||||
frame->linesize[0] = out_width;
|
||||
frame->linesize[1] = out_width/2;
|
||||
frame->linesize[2] = out_width/2;
|
||||
|
||||
convert_buf.resize(in_width * in_height * 3 / 2);
|
||||
|
||||
if (in_width != out_width || in_height != out_height) {
|
||||
downscale_buf.resize(out_width * out_height * 3 / 2);
|
||||
}
|
||||
}
|
||||
|
||||
FfmpegEncoder::~FfmpegEncoder() {
|
||||
encoder_close();
|
||||
av_frame_free(&frame);
|
||||
}
|
||||
|
||||
void FfmpegEncoder::encoder_open(const char* path) {
|
||||
const AVCodec *codec = avcodec_find_encoder(AV_CODEC_ID_FFVHUFF);
|
||||
|
||||
this->codec_ctx = avcodec_alloc_context3(codec);
|
||||
assert(this->codec_ctx);
|
||||
this->codec_ctx->width = frame->width;
|
||||
this->codec_ctx->height = frame->height;
|
||||
this->codec_ctx->pix_fmt = AV_PIX_FMT_YUV420P;
|
||||
this->codec_ctx->time_base = (AVRational){ 1, encoder_info.fps };
|
||||
int err = avcodec_open2(this->codec_ctx, codec, NULL);
|
||||
assert(err >= 0);
|
||||
|
||||
is_open = true;
|
||||
segment_num++;
|
||||
counter = 0;
|
||||
}
|
||||
|
||||
void FfmpegEncoder::encoder_close() {
|
||||
if (!is_open) return;
|
||||
|
||||
avcodec_free_context(&codec_ctx);
|
||||
is_open = false;
|
||||
}
|
||||
|
||||
int FfmpegEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
|
||||
assert(buf->width == this->in_width);
|
||||
assert(buf->height == this->in_height);
|
||||
|
||||
uint8_t *cy = convert_buf.data();
|
||||
uint8_t *cu = cy + in_width * in_height;
|
||||
uint8_t *cv = cu + (in_width / 2) * (in_height / 2);
|
||||
libyuv::NV12ToI420(buf->y, buf->stride,
|
||||
buf->uv, buf->stride,
|
||||
cy, in_width,
|
||||
cu, in_width/2,
|
||||
cv, in_width/2,
|
||||
in_width, in_height);
|
||||
|
||||
if (downscale_buf.size() > 0) {
|
||||
uint8_t *out_y = downscale_buf.data();
|
||||
uint8_t *out_u = out_y + frame->width * frame->height;
|
||||
uint8_t *out_v = out_u + (frame->width / 2) * (frame->height / 2);
|
||||
libyuv::I420Scale(cy, in_width,
|
||||
cu, in_width/2,
|
||||
cv, in_width/2,
|
||||
in_width, in_height,
|
||||
out_y, frame->width,
|
||||
out_u, frame->width/2,
|
||||
out_v, frame->width/2,
|
||||
frame->width, frame->height,
|
||||
libyuv::kFilterNone);
|
||||
frame->data[0] = out_y;
|
||||
frame->data[1] = out_u;
|
||||
frame->data[2] = out_v;
|
||||
} else {
|
||||
frame->data[0] = cy;
|
||||
frame->data[1] = cu;
|
||||
frame->data[2] = cv;
|
||||
}
|
||||
frame->pts = counter*50*1000; // 50ms per frame
|
||||
|
||||
int ret = counter;
|
||||
|
||||
int err = avcodec_send_frame(this->codec_ctx, frame);
|
||||
if (err < 0) {
|
||||
LOGE("avcodec_send_frame error %d", err);
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
AVPacket pkt;
|
||||
av_init_packet(&pkt);
|
||||
pkt.data = NULL;
|
||||
pkt.size = 0;
|
||||
while (ret >= 0) {
|
||||
err = avcodec_receive_packet(this->codec_ctx, &pkt);
|
||||
if (err == AVERROR_EOF) {
|
||||
break;
|
||||
} else if (err == AVERROR(EAGAIN)) {
|
||||
// Encoder might need a few frames on startup to get started. Keep going
|
||||
ret = 0;
|
||||
break;
|
||||
} else if (err < 0) {
|
||||
LOGE("avcodec_receive_packet error %d", err);
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
if (env_debug_encoder) {
|
||||
printf("%20s got %8d bytes flags %8x idx %4d id %8d\n", encoder_info.publish_name, pkt.size, pkt.flags, counter, extra->frame_id);
|
||||
}
|
||||
|
||||
publisher_publish(this, segment_num, counter, *extra,
|
||||
(pkt.flags & AV_PKT_FLAG_KEY) ? V4L2_BUF_FLAG_KEYFRAME : 0,
|
||||
kj::arrayPtr<capnp::byte>(pkt.data, (size_t)0), // TODO: get the header
|
||||
kj::arrayPtr<capnp::byte>(pkt.data, pkt.size));
|
||||
|
||||
counter++;
|
||||
}
|
||||
av_packet_unref(&pkt);
|
||||
return ret;
|
||||
}
|
||||
34
system/loggerd/encoder/ffmpeg_encoder.h
Normal file
34
system/loggerd/encoder/ffmpeg_encoder.h
Normal file
@@ -0,0 +1,34 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
#include <libavcodec/avcodec.h>
|
||||
#include <libavformat/avformat.h>
|
||||
#include <libavutil/imgutils.h>
|
||||
}
|
||||
|
||||
#include "system/loggerd/encoder/encoder.h"
|
||||
#include "system/loggerd/loggerd.h"
|
||||
|
||||
class FfmpegEncoder : public VideoEncoder {
|
||||
public:
|
||||
FfmpegEncoder(const EncoderInfo &encoder_info, int in_width, int in_height);
|
||||
~FfmpegEncoder();
|
||||
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
|
||||
void encoder_open(const char* path);
|
||||
void encoder_close();
|
||||
|
||||
private:
|
||||
int segment_num = -1;
|
||||
int counter = 0;
|
||||
bool is_open = false;
|
||||
|
||||
AVCodecContext *codec_ctx;
|
||||
AVFrame *frame = NULL;
|
||||
std::vector<uint8_t> convert_buf;
|
||||
std::vector<uint8_t> downscale_buf;
|
||||
};
|
||||
331
system/loggerd/encoder/v4l_encoder.cc
Normal file
331
system/loggerd/encoder/v4l_encoder.cc
Normal file
@@ -0,0 +1,331 @@
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
#include <sys/ioctl.h>
|
||||
#include <poll.h>
|
||||
|
||||
#include "system/loggerd/encoder/v4l_encoder.h"
|
||||
#include "common/util.h"
|
||||
#include "common/timing.h"
|
||||
|
||||
#include "third_party/libyuv/include/libyuv.h"
|
||||
#include "third_party/linux/include/msm_media_info.h"
|
||||
|
||||
// has to be in this order
|
||||
#include "third_party/linux/include/v4l2-controls.h"
|
||||
#include <linux/videodev2.h>
|
||||
#define V4L2_QCOM_BUF_FLAG_CODECCONFIG 0x00020000
|
||||
#define V4L2_QCOM_BUF_FLAG_EOS 0x02000000
|
||||
|
||||
/*
|
||||
kernel debugging:
|
||||
echo 0xff > /sys/module/videobuf2_core/parameters/debug
|
||||
echo 0x7fffffff > /sys/kernel/debug/msm_vidc/debug_level
|
||||
echo 0xff > /sys/devices/platform/soc/aa00000.qcom,vidc/video4linux/video33/dev_debug
|
||||
*/
|
||||
const int env_debug_encoder = (getenv("DEBUG_ENCODER") != NULL) ? atoi(getenv("DEBUG_ENCODER")) : 0;
|
||||
|
||||
static void checked_ioctl(int fd, unsigned long request, void *argp) {
|
||||
int ret = util::safe_ioctl(fd, request, argp);
|
||||
if (ret != 0) {
|
||||
LOGE("checked_ioctl failed with error %d (%d %lx %p)", errno, fd, request, argp);
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
static void dequeue_buffer(int fd, v4l2_buf_type buf_type, unsigned int *index=NULL, unsigned int *bytesused=NULL, unsigned int *flags=NULL, struct timeval *timestamp=NULL) {
|
||||
v4l2_plane plane = {0};
|
||||
v4l2_buffer v4l_buf = {
|
||||
.type = buf_type,
|
||||
.memory = V4L2_MEMORY_USERPTR,
|
||||
.m = { .planes = &plane, },
|
||||
.length = 1,
|
||||
};
|
||||
checked_ioctl(fd, VIDIOC_DQBUF, &v4l_buf);
|
||||
|
||||
if (index) *index = v4l_buf.index;
|
||||
if (bytesused) *bytesused = v4l_buf.m.planes[0].bytesused;
|
||||
if (flags) *flags = v4l_buf.flags;
|
||||
if (timestamp) *timestamp = v4l_buf.timestamp;
|
||||
assert(v4l_buf.m.planes[0].data_offset == 0);
|
||||
}
|
||||
|
||||
static void queue_buffer(int fd, v4l2_buf_type buf_type, unsigned int index, VisionBuf *buf, struct timeval timestamp={}) {
|
||||
v4l2_plane plane = {
|
||||
.length = (unsigned int)buf->len,
|
||||
.m = { .userptr = (unsigned long)buf->addr, },
|
||||
.bytesused = (uint32_t)buf->len,
|
||||
.reserved = {(unsigned int)buf->fd}
|
||||
};
|
||||
|
||||
v4l2_buffer v4l_buf = {
|
||||
.type = buf_type,
|
||||
.index = index,
|
||||
.memory = V4L2_MEMORY_USERPTR,
|
||||
.m = { .planes = &plane, },
|
||||
.length = 1,
|
||||
.flags = V4L2_BUF_FLAG_TIMESTAMP_COPY,
|
||||
.timestamp = timestamp
|
||||
};
|
||||
|
||||
checked_ioctl(fd, VIDIOC_QBUF, &v4l_buf);
|
||||
}
|
||||
|
||||
static void request_buffers(int fd, v4l2_buf_type buf_type, unsigned int count) {
|
||||
struct v4l2_requestbuffers reqbuf = {
|
||||
.type = buf_type,
|
||||
.memory = V4L2_MEMORY_USERPTR,
|
||||
.count = count
|
||||
};
|
||||
checked_ioctl(fd, VIDIOC_REQBUFS, &reqbuf);
|
||||
}
|
||||
|
||||
void V4LEncoder::dequeue_handler(V4LEncoder *e) {
|
||||
std::string dequeue_thread_name = "dq-"+std::string(e->encoder_info.publish_name);
|
||||
util::set_thread_name(dequeue_thread_name.c_str());
|
||||
|
||||
e->segment_num++;
|
||||
uint32_t idx = -1;
|
||||
bool exit = false;
|
||||
|
||||
// POLLIN is capture, POLLOUT is frame
|
||||
struct pollfd pfd;
|
||||
pfd.events = POLLIN | POLLOUT;
|
||||
pfd.fd = e->fd;
|
||||
|
||||
// save the header
|
||||
kj::Array<capnp::byte> header;
|
||||
|
||||
while (!exit) {
|
||||
int rc = poll(&pfd, 1, 1000);
|
||||
if (rc < 0) {
|
||||
if (errno != EINTR) {
|
||||
// TODO: exit encoder?
|
||||
// ignore the error and keep going
|
||||
LOGE("poll failed (%d - %d)", rc, errno);
|
||||
}
|
||||
continue;
|
||||
} else if (rc == 0) {
|
||||
LOGE("encoder dequeue poll timeout");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (env_debug_encoder >= 2) {
|
||||
printf("%20s poll %x at %.2f ms\n", e->encoder_info.publish_name, pfd.revents, millis_since_boot());
|
||||
}
|
||||
|
||||
int frame_id = -1;
|
||||
if (pfd.revents & POLLIN) {
|
||||
unsigned int bytesused, flags, index;
|
||||
struct timeval timestamp;
|
||||
dequeue_buffer(e->fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, &index, &bytesused, &flags, ×tamp);
|
||||
e->buf_out[index].sync(VISIONBUF_SYNC_FROM_DEVICE);
|
||||
uint8_t *buf = (uint8_t*)e->buf_out[index].addr;
|
||||
int64_t ts = timestamp.tv_sec * 1000000 + timestamp.tv_usec;
|
||||
|
||||
// eof packet, we exit
|
||||
if (flags & V4L2_QCOM_BUF_FLAG_EOS) {
|
||||
exit = true;
|
||||
} else if (flags & V4L2_QCOM_BUF_FLAG_CODECCONFIG) {
|
||||
// save header
|
||||
header = kj::heapArray<capnp::byte>(buf, bytesused);
|
||||
} else {
|
||||
VisionIpcBufExtra extra = e->extras.pop();
|
||||
assert(extra.timestamp_eof/1000 == ts); // stay in sync
|
||||
frame_id = extra.frame_id;
|
||||
++idx;
|
||||
e->publisher_publish(e, e->segment_num, idx, extra, flags, header, kj::arrayPtr<capnp::byte>(buf, bytesused));
|
||||
}
|
||||
|
||||
if (env_debug_encoder) {
|
||||
printf("%20s got(%d) %6d bytes flags %8x idx %3d/%4d id %8d ts %ld lat %.2f ms (%lu frames free)\n",
|
||||
e->encoder_info.publish_name, index, bytesused, flags, e->segment_num, idx, frame_id, ts, millis_since_boot()-(ts/1000.), e->free_buf_in.size());
|
||||
}
|
||||
|
||||
// requeue the buffer
|
||||
queue_buffer(e->fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, index, &e->buf_out[index]);
|
||||
}
|
||||
|
||||
if (pfd.revents & POLLOUT) {
|
||||
unsigned int index;
|
||||
dequeue_buffer(e->fd, V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, &index);
|
||||
e->free_buf_in.push(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
V4LEncoder::V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_height)
|
||||
: VideoEncoder(encoder_info, in_width, in_height) {
|
||||
fd = open("/dev/v4l/by-path/platform-aa00000.qcom_vidc-video-index1", O_RDWR|O_NONBLOCK);
|
||||
assert(fd >= 0);
|
||||
|
||||
struct v4l2_capability cap;
|
||||
checked_ioctl(fd, VIDIOC_QUERYCAP, &cap);
|
||||
LOGD("opened encoder device %s %s = %d", cap.driver, cap.card, fd);
|
||||
assert(strcmp((const char *)cap.driver, "msm_vidc_driver") == 0);
|
||||
assert(strcmp((const char *)cap.card, "msm_vidc_venc") == 0);
|
||||
|
||||
struct v4l2_format fmt_out = {
|
||||
.type = V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE,
|
||||
.fmt = {
|
||||
.pix_mp = {
|
||||
// downscales are free with v4l
|
||||
.width = (unsigned int)(out_width),
|
||||
.height = (unsigned int)(out_height),
|
||||
.pixelformat = (encoder_info.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C) ? V4L2_PIX_FMT_HEVC : V4L2_PIX_FMT_H264,
|
||||
.field = V4L2_FIELD_ANY,
|
||||
.colorspace = V4L2_COLORSPACE_DEFAULT,
|
||||
}
|
||||
}
|
||||
};
|
||||
checked_ioctl(fd, VIDIOC_S_FMT, &fmt_out);
|
||||
|
||||
v4l2_streamparm streamparm = {
|
||||
.type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE,
|
||||
.parm = {
|
||||
.output = {
|
||||
// TODO: more stuff here? we don't know
|
||||
.timeperframe = {
|
||||
.numerator = 1,
|
||||
.denominator = 20
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
checked_ioctl(fd, VIDIOC_S_PARM, &streamparm);
|
||||
|
||||
struct v4l2_format fmt_in = {
|
||||
.type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE,
|
||||
.fmt = {
|
||||
.pix_mp = {
|
||||
.width = (unsigned int)in_width,
|
||||
.height = (unsigned int)in_height,
|
||||
.pixelformat = V4L2_PIX_FMT_NV12,
|
||||
.field = V4L2_FIELD_ANY,
|
||||
.colorspace = V4L2_COLORSPACE_470_SYSTEM_BG,
|
||||
}
|
||||
}
|
||||
};
|
||||
checked_ioctl(fd, VIDIOC_S_FMT, &fmt_in);
|
||||
|
||||
LOGD("in buffer size %d, out buffer size %d",
|
||||
fmt_in.fmt.pix_mp.plane_fmt[0].sizeimage,
|
||||
fmt_out.fmt.pix_mp.plane_fmt[0].sizeimage);
|
||||
|
||||
// shared ctrls
|
||||
{
|
||||
struct v4l2_control ctrls[] = {
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_HEADER_MODE, .value = V4L2_MPEG_VIDEO_HEADER_MODE_SEPARATE},
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_BITRATE, .value = encoder_info.bitrate},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_RATE_CONTROL, .value = V4L2_CID_MPEG_VIDC_VIDEO_RATE_CONTROL_VBR_CFR},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_PRIORITY, .value = V4L2_MPEG_VIDC_VIDEO_PRIORITY_REALTIME_DISABLE},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_IDR_PERIOD, .value = 1},
|
||||
};
|
||||
for (auto ctrl : ctrls) {
|
||||
checked_ioctl(fd, VIDIOC_S_CTRL, &ctrl);
|
||||
}
|
||||
}
|
||||
|
||||
if (encoder_info.encode_type == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
|
||||
struct v4l2_control ctrls[] = {
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_HEVC_PROFILE, .value = V4L2_MPEG_VIDC_VIDEO_HEVC_PROFILE_MAIN},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_HEVC_TIER_LEVEL, .value = V4L2_MPEG_VIDC_VIDEO_HEVC_LEVEL_HIGH_TIER_LEVEL_5},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_VUI_TIMING_INFO, .value = V4L2_MPEG_VIDC_VIDEO_VUI_TIMING_INFO_ENABLED},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_NUM_P_FRAMES, .value = 29},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_NUM_B_FRAMES, .value = 0},
|
||||
};
|
||||
for (auto ctrl : ctrls) {
|
||||
checked_ioctl(fd, VIDIOC_S_CTRL, &ctrl);
|
||||
}
|
||||
} else {
|
||||
struct v4l2_control ctrls[] = {
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_H264_PROFILE, .value = V4L2_MPEG_VIDEO_H264_PROFILE_HIGH},
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_H264_LEVEL, .value = V4L2_MPEG_VIDEO_H264_LEVEL_UNKNOWN},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_NUM_P_FRAMES, .value = 14},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_NUM_B_FRAMES, .value = 0},
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_H264_ENTROPY_MODE, .value = V4L2_MPEG_VIDEO_H264_ENTROPY_MODE_CABAC},
|
||||
{ .id = V4L2_CID_MPEG_VIDC_VIDEO_H264_CABAC_MODEL, .value = V4L2_CID_MPEG_VIDC_VIDEO_H264_CABAC_MODEL_0},
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_H264_LOOP_FILTER_MODE, .value = 0},
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_H264_LOOP_FILTER_ALPHA, .value = 0},
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_H264_LOOP_FILTER_BETA, .value = 0},
|
||||
{ .id = V4L2_CID_MPEG_VIDEO_MULTI_SLICE_MODE, .value = 0},
|
||||
};
|
||||
for (auto ctrl : ctrls) {
|
||||
checked_ioctl(fd, VIDIOC_S_CTRL, &ctrl);
|
||||
}
|
||||
}
|
||||
|
||||
// allocate buffers
|
||||
request_buffers(fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, BUF_OUT_COUNT);
|
||||
request_buffers(fd, V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, BUF_IN_COUNT);
|
||||
|
||||
// start encoder
|
||||
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE;
|
||||
checked_ioctl(fd, VIDIOC_STREAMON, &buf_type);
|
||||
buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
|
||||
checked_ioctl(fd, VIDIOC_STREAMON, &buf_type);
|
||||
|
||||
// queue up output buffers
|
||||
for (unsigned int i = 0; i < BUF_OUT_COUNT; i++) {
|
||||
buf_out[i].allocate(fmt_out.fmt.pix_mp.plane_fmt[0].sizeimage);
|
||||
queue_buffer(fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, i, &buf_out[i]);
|
||||
}
|
||||
// queue up input buffers
|
||||
for (unsigned int i = 0; i < BUF_IN_COUNT; i++) {
|
||||
free_buf_in.push(i);
|
||||
}
|
||||
}
|
||||
|
||||
void V4LEncoder::encoder_open(const char* path) {
|
||||
dequeue_handler_thread = std::thread(V4LEncoder::dequeue_handler, this);
|
||||
this->is_open = true;
|
||||
this->counter = 0;
|
||||
}
|
||||
|
||||
int V4LEncoder::encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra) {
|
||||
struct timeval timestamp {
|
||||
.tv_sec = (long)(extra->timestamp_eof/1000000000),
|
||||
.tv_usec = (long)((extra->timestamp_eof/1000) % 1000000),
|
||||
};
|
||||
|
||||
// reserve buffer
|
||||
int buffer_in = free_buf_in.pop();
|
||||
|
||||
// push buffer
|
||||
extras.push(*extra);
|
||||
//buf->sync(VISIONBUF_SYNC_TO_DEVICE);
|
||||
queue_buffer(fd, V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, buffer_in, buf, timestamp);
|
||||
|
||||
return this->counter++;
|
||||
}
|
||||
|
||||
void V4LEncoder::encoder_close() {
|
||||
if (this->is_open) {
|
||||
// pop all the frames before closing, then put the buffers back
|
||||
for (int i = 0; i < BUF_IN_COUNT; i++) free_buf_in.pop();
|
||||
for (int i = 0; i < BUF_IN_COUNT; i++) free_buf_in.push(i);
|
||||
// no frames, stop the encoder
|
||||
struct v4l2_encoder_cmd encoder_cmd = { .cmd = V4L2_ENC_CMD_STOP };
|
||||
checked_ioctl(fd, VIDIOC_ENCODER_CMD, &encoder_cmd);
|
||||
// join waits for V4L2_QCOM_BUF_FLAG_EOS
|
||||
dequeue_handler_thread.join();
|
||||
assert(extras.empty());
|
||||
}
|
||||
this->is_open = false;
|
||||
}
|
||||
|
||||
V4LEncoder::~V4LEncoder() {
|
||||
encoder_close();
|
||||
v4l2_buf_type buf_type = V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE;
|
||||
checked_ioctl(fd, VIDIOC_STREAMOFF, &buf_type);
|
||||
request_buffers(fd, V4L2_BUF_TYPE_VIDEO_OUTPUT_MPLANE, 0);
|
||||
buf_type = V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE;
|
||||
checked_ioctl(fd, VIDIOC_STREAMOFF, &buf_type);
|
||||
request_buffers(fd, V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE, 0);
|
||||
close(fd);
|
||||
|
||||
for (int i = 0; i < BUF_OUT_COUNT; i++) {
|
||||
if (buf_out[i].free() != 0) {
|
||||
LOGE("Failed to free buffer");
|
||||
}
|
||||
}
|
||||
}
|
||||
30
system/loggerd/encoder/v4l_encoder.h
Normal file
30
system/loggerd/encoder/v4l_encoder.h
Normal file
@@ -0,0 +1,30 @@
|
||||
#pragma once
|
||||
|
||||
#include "common/queue.h"
|
||||
#include "system/loggerd/encoder/encoder.h"
|
||||
|
||||
#define BUF_IN_COUNT 7
|
||||
#define BUF_OUT_COUNT 6
|
||||
|
||||
class V4LEncoder : public VideoEncoder {
|
||||
public:
|
||||
V4LEncoder(const EncoderInfo &encoder_info, int in_width, int in_height);
|
||||
~V4LEncoder();
|
||||
int encode_frame(VisionBuf* buf, VisionIpcBufExtra *extra);
|
||||
void encoder_open(const char* path);
|
||||
void encoder_close();
|
||||
private:
|
||||
int fd;
|
||||
|
||||
bool is_open = false;
|
||||
int segment_num = -1;
|
||||
int counter = 0;
|
||||
|
||||
SafeQueue<VisionIpcBufExtra> extras;
|
||||
|
||||
static void dequeue_handler(V4LEncoder *e);
|
||||
std::thread dequeue_handler_thread;
|
||||
|
||||
VisionBuf buf_out[BUF_OUT_COUNT];
|
||||
SafeQueue<unsigned int> free_buf_in;
|
||||
};
|
||||
Binary file not shown.
161
system/loggerd/encoderd.cc
Normal file
161
system/loggerd/encoderd.cc
Normal file
@@ -0,0 +1,161 @@
|
||||
#include <cassert>
|
||||
|
||||
#include "system/loggerd/loggerd.h"
|
||||
|
||||
#ifdef QCOM2
|
||||
#include "system/loggerd/encoder/v4l_encoder.h"
|
||||
#define Encoder V4LEncoder
|
||||
#else
|
||||
#include "system/loggerd/encoder/ffmpeg_encoder.h"
|
||||
#define Encoder FfmpegEncoder
|
||||
#endif
|
||||
|
||||
ExitHandler do_exit;
|
||||
|
||||
struct EncoderdState {
|
||||
int max_waiting = 0;
|
||||
|
||||
// Sync logic for startup
|
||||
std::atomic<int> encoders_ready = 0;
|
||||
std::atomic<uint32_t> start_frame_id = 0;
|
||||
bool camera_ready[WideRoadCam + 1] = {};
|
||||
bool camera_synced[WideRoadCam + 1] = {};
|
||||
};
|
||||
|
||||
// Handle initial encoder syncing by waiting for all encoders to reach the same frame id
|
||||
bool sync_encoders(EncoderdState *s, CameraType cam_type, uint32_t frame_id) {
|
||||
if (s->camera_synced[cam_type]) return true;
|
||||
|
||||
if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) {
|
||||
// add a small margin to the start frame id in case one of the encoders already dropped the next frame
|
||||
update_max_atomic(s->start_frame_id, frame_id + 2);
|
||||
if (std::exchange(s->camera_ready[cam_type], true) == false) {
|
||||
++s->encoders_ready;
|
||||
LOGD("camera %d encoder ready", cam_type);
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
if (s->max_waiting == 1) update_max_atomic(s->start_frame_id, frame_id);
|
||||
bool synced = frame_id >= s->start_frame_id;
|
||||
s->camera_synced[cam_type] = synced;
|
||||
if (!synced) LOGD("camera %d waiting for frame %d, cur %d", cam_type, (int)s->start_frame_id, frame_id);
|
||||
return synced;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) {
|
||||
util::set_thread_name(cam_info.thread_name);
|
||||
|
||||
std::vector<std::unique_ptr<Encoder>> encoders;
|
||||
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
|
||||
|
||||
int cur_seg = 0;
|
||||
while (!do_exit) {
|
||||
if (!vipc_client.connect(false)) {
|
||||
util::sleep_for(5);
|
||||
continue;
|
||||
}
|
||||
|
||||
// init encoders
|
||||
if (encoders.empty()) {
|
||||
VisionBuf buf_info = vipc_client.buffers[0];
|
||||
LOGW("encoder %s init %zux%zu", cam_info.thread_name, buf_info.width, buf_info.height);
|
||||
assert(buf_info.width > 0 && buf_info.height > 0);
|
||||
|
||||
for (const auto &encoder_info : cam_info.encoder_infos) {
|
||||
auto &e = encoders.emplace_back(new Encoder(encoder_info, buf_info.width, buf_info.height));
|
||||
e->encoder_open(nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
bool lagging = false;
|
||||
while (!do_exit) {
|
||||
VisionIpcBufExtra extra;
|
||||
VisionBuf* buf = vipc_client.recv(&extra);
|
||||
if (buf == nullptr) continue;
|
||||
|
||||
// detect loop around and drop the frames
|
||||
if (buf->get_frame_id() != extra.frame_id) {
|
||||
if (!lagging) {
|
||||
LOGE("encoder %s lag buffer id: %" PRIu64 " extra id: %d", cam_info.thread_name, buf->get_frame_id(), extra.frame_id);
|
||||
lagging = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
lagging = false;
|
||||
|
||||
if (!sync_encoders(s, cam_info.type, extra.frame_id)) {
|
||||
continue;
|
||||
}
|
||||
if (do_exit) break;
|
||||
|
||||
// do rotation if required
|
||||
const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS;
|
||||
if (cur_seg >= 0 && extra.frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) {
|
||||
for (auto &e : encoders) {
|
||||
e->encoder_close();
|
||||
e->encoder_open(NULL);
|
||||
}
|
||||
++cur_seg;
|
||||
}
|
||||
|
||||
// encode a frame
|
||||
for (int i = 0; i < encoders.size(); ++i) {
|
||||
int out_id = encoders[i]->encode_frame(buf, &extra);
|
||||
|
||||
if (out_id == -1) {
|
||||
LOGE("Failed to encode frame. frame_id: %d", extra.frame_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <size_t N>
|
||||
void encoderd_thread(const LogCameraInfo (&cameras)[N]) {
|
||||
EncoderdState s;
|
||||
|
||||
std::set<VisionStreamType> streams;
|
||||
while (!do_exit) {
|
||||
streams = VisionIpcClient::getAvailableStreams("camerad", false);
|
||||
if (!streams.empty()) {
|
||||
break;
|
||||
}
|
||||
util::sleep_for(100);
|
||||
}
|
||||
|
||||
if (!streams.empty()) {
|
||||
std::vector<std::thread> encoder_threads;
|
||||
for (auto stream : streams) {
|
||||
auto it = std::find_if(std::begin(cameras), std::end(cameras),
|
||||
[stream](auto &cam) { return cam.stream_type == stream; });
|
||||
assert(it != std::end(cameras));
|
||||
++s.max_waiting;
|
||||
encoder_threads.push_back(std::thread(encoder_thread, &s, *it));
|
||||
}
|
||||
|
||||
for (auto &t : encoder_threads) t.join();
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
if (!Hardware::PC()) {
|
||||
int ret;
|
||||
ret = util::set_realtime_priority(52);
|
||||
assert(ret == 0);
|
||||
ret = util::set_core_affinity({3});
|
||||
assert(ret == 0);
|
||||
}
|
||||
if (argc > 1) {
|
||||
std::string arg1(argv[1]);
|
||||
if (arg1 == "--stream") {
|
||||
encoderd_thread(stream_cameras_logged);
|
||||
} else {
|
||||
LOGE("Argument '%s' is not supported", arg1.c_str());
|
||||
}
|
||||
} else {
|
||||
encoderd_thread(cameras_logged);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
163
system/loggerd/logger.cc
Normal file
163
system/loggerd/logger.cc
Normal file
@@ -0,0 +1,163 @@
|
||||
#include "system/loggerd/logger.h"
|
||||
|
||||
#include <fstream>
|
||||
#include <map>
|
||||
#include <vector>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <random>
|
||||
|
||||
#include "common/params.h"
|
||||
#include "common/swaglog.h"
|
||||
#include "common/version.h"
|
||||
|
||||
// ***** log metadata *****
|
||||
kj::Array<capnp::word> logger_build_init_data() {
|
||||
uint64_t wall_time = nanos_since_epoch();
|
||||
|
||||
MessageBuilder msg;
|
||||
auto init = msg.initEvent().initInitData();
|
||||
|
||||
init.setWallTimeNanos(wall_time);
|
||||
init.setVersion(COMMA_VERSION);
|
||||
init.setDirty(!getenv("CLEAN"));
|
||||
init.setDeviceType(Hardware::get_device_type());
|
||||
|
||||
// log kernel args
|
||||
std::ifstream cmdline_stream("/proc/cmdline");
|
||||
std::vector<std::string> kernel_args;
|
||||
std::string buf;
|
||||
while (cmdline_stream >> buf) {
|
||||
kernel_args.push_back(buf);
|
||||
}
|
||||
|
||||
auto lkernel_args = init.initKernelArgs(kernel_args.size());
|
||||
for (int i=0; i<kernel_args.size(); i++) {
|
||||
lkernel_args.set(i, kernel_args[i]);
|
||||
}
|
||||
|
||||
init.setKernelVersion(util::read_file("/proc/version"));
|
||||
init.setOsVersion(util::read_file("/VERSION"));
|
||||
|
||||
// log params
|
||||
auto params = Params(util::getenv("PARAMS_COPY_PATH", ""));
|
||||
std::map<std::string, std::string> params_map = params.readAll();
|
||||
|
||||
init.setGitCommit(params_map["GitCommit"]);
|
||||
init.setGitCommitDate(params_map["GitCommitDate"]);
|
||||
init.setGitBranch(params_map["GitBranch"]);
|
||||
init.setGitRemote(params_map["GitRemote"]);
|
||||
init.setPassive(false);
|
||||
init.setDongleId(params_map["DongleId"]);
|
||||
|
||||
auto lparams = init.initParams().initEntries(params_map.size());
|
||||
int j = 0;
|
||||
for (auto& [key, value] : params_map) {
|
||||
auto lentry = lparams[j];
|
||||
lentry.setKey(key);
|
||||
if ( !(params.getKeyType(key) & DONT_LOG) ) {
|
||||
lentry.setValue(capnp::Data::Reader((const kj::byte*)value.data(), value.size()));
|
||||
}
|
||||
j++;
|
||||
}
|
||||
|
||||
// log commands
|
||||
std::vector<std::string> log_commands = {
|
||||
"df -h", // usage for all filesystems
|
||||
};
|
||||
|
||||
auto hw_logs = Hardware::get_init_logs();
|
||||
|
||||
auto commands = init.initCommands().initEntries(log_commands.size() + hw_logs.size());
|
||||
for (int i = 0; i < log_commands.size(); i++) {
|
||||
auto lentry = commands[i];
|
||||
|
||||
lentry.setKey(log_commands[i]);
|
||||
|
||||
const std::string result = util::check_output(log_commands[i]);
|
||||
lentry.setValue(capnp::Data::Reader((const kj::byte*)result.data(), result.size()));
|
||||
}
|
||||
|
||||
int i = log_commands.size();
|
||||
for (auto &[key, value] : hw_logs) {
|
||||
auto lentry = commands[i];
|
||||
lentry.setKey(key);
|
||||
lentry.setValue(capnp::Data::Reader((const kj::byte*)value.data(), value.size()));
|
||||
i++;
|
||||
}
|
||||
|
||||
return capnp::messageToFlatArray(msg);
|
||||
}
|
||||
|
||||
std::string logger_get_identifier(std::string key) {
|
||||
// a log identifier is a 32 bit counter, plus a 10 character unique ID.
|
||||
// e.g. 000001a3--c20ba54385
|
||||
|
||||
Params params;
|
||||
uint32_t cnt;
|
||||
try {
|
||||
cnt = std::stol(params.get(key));
|
||||
} catch (std::exception &e) {
|
||||
cnt = 0;
|
||||
}
|
||||
params.put(key, std::to_string(cnt + 1));
|
||||
|
||||
std::stringstream ss;
|
||||
std::random_device rd;
|
||||
std::mt19937 mt(rd());
|
||||
std::uniform_int_distribution<int> dist(0, 15);
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
ss << std::hex << dist(mt);
|
||||
}
|
||||
|
||||
return util::string_format("%08x--%s", cnt, ss.str().c_str());
|
||||
}
|
||||
|
||||
static void log_sentinel(LoggerState *log, SentinelType type, int eixt_signal = 0) {
|
||||
MessageBuilder msg;
|
||||
auto sen = msg.initEvent().initSentinel();
|
||||
sen.setType(type);
|
||||
sen.setSignal(eixt_signal);
|
||||
log->write(msg.toBytes(), true);
|
||||
}
|
||||
|
||||
LoggerState::LoggerState(const std::string &log_root) {
|
||||
route_name = logger_get_identifier("RouteCount");
|
||||
route_path = log_root + "/" + route_name;
|
||||
init_data = logger_build_init_data();
|
||||
}
|
||||
|
||||
LoggerState::~LoggerState() {
|
||||
if (rlog) {
|
||||
log_sentinel(this, SentinelType::END_OF_ROUTE, exit_signal);
|
||||
std::remove(lock_file.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
bool LoggerState::next() {
|
||||
if (rlog) {
|
||||
log_sentinel(this, SentinelType::END_OF_SEGMENT);
|
||||
std::remove(lock_file.c_str());
|
||||
}
|
||||
|
||||
segment_path = route_path + "--" + std::to_string(++part);
|
||||
bool ret = util::create_directories(segment_path, 0775);
|
||||
assert(ret == true);
|
||||
|
||||
const std::string rlog_path = segment_path + "/rlog";
|
||||
lock_file = rlog_path + ".lock";
|
||||
std::ofstream{lock_file};
|
||||
|
||||
rlog.reset(new RawFile(rlog_path));
|
||||
qlog.reset(new RawFile(segment_path + "/qlog"));
|
||||
|
||||
// log init data & sentinel type.
|
||||
write(init_data.asBytes(), true);
|
||||
log_sentinel(this, part > 0 ? SentinelType::START_OF_SEGMENT : SentinelType::START_OF_ROUTE);
|
||||
return true;
|
||||
}
|
||||
|
||||
void LoggerState::write(uint8_t* data, size_t size, bool in_qlog) {
|
||||
rlog->write(data, size);
|
||||
if (in_qlog) qlog->write(data, size);
|
||||
}
|
||||
55
system/loggerd/logger.h
Normal file
55
system/loggerd/logger.h
Normal file
@@ -0,0 +1,55 @@
|
||||
#pragma once
|
||||
|
||||
#include <cassert>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "cereal/messaging/messaging.h"
|
||||
#include "common/util.h"
|
||||
#include "system/hardware/hw.h"
|
||||
|
||||
class RawFile {
|
||||
public:
|
||||
RawFile(const std::string &path) {
|
||||
file = util::safe_fopen(path.c_str(), "wb");
|
||||
assert(file != nullptr);
|
||||
}
|
||||
~RawFile() {
|
||||
util::safe_fflush(file);
|
||||
int err = fclose(file);
|
||||
assert(err == 0);
|
||||
}
|
||||
inline void write(void* data, size_t size) {
|
||||
int written = util::safe_fwrite(data, 1, size, file);
|
||||
assert(written == size);
|
||||
}
|
||||
inline void write(kj::ArrayPtr<capnp::byte> array) { write(array.begin(), array.size()); }
|
||||
|
||||
private:
|
||||
FILE* file = nullptr;
|
||||
};
|
||||
|
||||
typedef cereal::Sentinel::SentinelType SentinelType;
|
||||
|
||||
|
||||
class LoggerState {
|
||||
public:
|
||||
LoggerState(const std::string& log_root = Path::log_root());
|
||||
~LoggerState();
|
||||
bool next();
|
||||
void write(uint8_t* data, size_t size, bool in_qlog);
|
||||
inline int segment() const { return part; }
|
||||
inline const std::string& segmentPath() const { return segment_path; }
|
||||
inline const std::string& routeName() const { return route_name; }
|
||||
inline void write(kj::ArrayPtr<kj::byte> bytes, bool in_qlog) { write(bytes.begin(), bytes.size(), in_qlog); }
|
||||
inline void setExitSignal(int signal) { exit_signal = signal; }
|
||||
|
||||
protected:
|
||||
int part = -1, exit_signal = 0;
|
||||
std::string route_path, route_name, segment_path, lock_file;
|
||||
kj::Array<capnp::word> init_data;
|
||||
std::unique_ptr<RawFile> rlog, qlog;
|
||||
};
|
||||
|
||||
kj::Array<capnp::word> logger_build_init_data();
|
||||
std::string logger_get_identifier(std::string key);
|
||||
Binary file not shown.
313
system/loggerd/loggerd.cc
Normal file
313
system/loggerd/loggerd.cc
Normal file
@@ -0,0 +1,313 @@
|
||||
#include <sys/xattr.h>
|
||||
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "common/params.h"
|
||||
#include "system/loggerd/encoder/encoder.h"
|
||||
#include "system/loggerd/loggerd.h"
|
||||
#include "system/loggerd/video_writer.h"
|
||||
|
||||
ExitHandler do_exit;
|
||||
|
||||
struct LoggerdState {
|
||||
LoggerState logger;
|
||||
std::atomic<double> last_camera_seen_tms;
|
||||
std::atomic<int> ready_to_rotate; // count of encoders ready to rotate
|
||||
int max_waiting = 0;
|
||||
double last_rotate_tms = 0.; // last rotate time in ms
|
||||
};
|
||||
|
||||
void logger_rotate(LoggerdState *s) {
|
||||
bool ret =s->logger.next();
|
||||
assert(ret);
|
||||
s->ready_to_rotate = 0;
|
||||
s->last_rotate_tms = millis_since_boot();
|
||||
LOGW((s->logger.segment() == 0) ? "logging to %s" : "rotated to %s", s->logger.segmentPath().c_str());
|
||||
}
|
||||
|
||||
void rotate_if_needed(LoggerdState *s) {
|
||||
// all encoders ready, trigger rotation
|
||||
bool all_ready = s->ready_to_rotate == s->max_waiting;
|
||||
|
||||
// fallback logic to prevent extremely long segments in the case of camera, encoder, etc. malfunctions
|
||||
bool timed_out = false;
|
||||
double tms = millis_since_boot();
|
||||
double seg_length_secs = (tms - s->last_rotate_tms) / 1000.;
|
||||
if ((seg_length_secs > SEGMENT_LENGTH) && !LOGGERD_TEST) {
|
||||
// TODO: might be nice to put these reasons in the sentinel
|
||||
if ((tms - s->last_camera_seen_tms) > NO_CAMERA_PATIENCE) {
|
||||
timed_out = true;
|
||||
LOGE("no camera packets seen. auto rotating");
|
||||
} else if (seg_length_secs > SEGMENT_LENGTH*1.2) {
|
||||
timed_out = true;
|
||||
LOGE("segment too long. auto rotating");
|
||||
}
|
||||
}
|
||||
|
||||
if (all_ready || timed_out) {
|
||||
logger_rotate(s);
|
||||
}
|
||||
}
|
||||
|
||||
struct RemoteEncoder {
|
||||
std::unique_ptr<VideoWriter> writer;
|
||||
int encoderd_segment_offset;
|
||||
int current_segment = -1;
|
||||
std::vector<Message *> q;
|
||||
int dropped_frames = 0;
|
||||
bool recording = false;
|
||||
bool marked_ready_to_rotate = false;
|
||||
bool seen_first_packet = false;
|
||||
};
|
||||
|
||||
int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
|
||||
int bytes_count = 0;
|
||||
|
||||
// extract the message
|
||||
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word)));
|
||||
auto event = cmsg.getRoot<cereal::Event>();
|
||||
auto edata = (event.*(encoder_info.get_encode_data_func))();
|
||||
auto idx = edata.getIdx();
|
||||
auto flags = idx.getFlags();
|
||||
|
||||
// encoderd can have started long before loggerd
|
||||
if (!re.seen_first_packet) {
|
||||
re.seen_first_packet = true;
|
||||
re.encoderd_segment_offset = idx.getSegmentNum();
|
||||
LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset);
|
||||
}
|
||||
int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset;
|
||||
|
||||
if (offset_segment_num == s->logger.segment()) {
|
||||
// loggerd is now on the segment that matches this packet
|
||||
|
||||
// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
|
||||
if (re.current_segment != s->logger.segment()) {
|
||||
if (re.recording) {
|
||||
re.writer.reset();
|
||||
re.recording = false;
|
||||
}
|
||||
re.current_segment = s->logger.segment();
|
||||
re.marked_ready_to_rotate = false;
|
||||
// we are in this segment now, process any queued messages before this one
|
||||
if (!re.q.empty()) {
|
||||
for (auto &qmsg : re.q) {
|
||||
bytes_count += handle_encoder_msg(s, qmsg, name, re, encoder_info);
|
||||
}
|
||||
re.q.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// if we aren't recording yet, try to start, since we are in the correct segment
|
||||
if (!re.recording) {
|
||||
if (flags & V4L2_BUF_FLAG_KEYFRAME) {
|
||||
// only create on iframe
|
||||
if (re.dropped_frames) {
|
||||
// this should only happen for the first segment, maybe
|
||||
LOGW("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames);
|
||||
re.dropped_frames = 0;
|
||||
}
|
||||
// if we aren't actually recording, don't create the writer
|
||||
if (encoder_info.record) {
|
||||
assert(encoder_info.filename != NULL);
|
||||
re.writer.reset(new VideoWriter(s->logger.segmentPath().c_str(),
|
||||
encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C,
|
||||
edata.getWidth(), edata.getHeight(), encoder_info.fps, idx.getType()));
|
||||
// write the header
|
||||
auto header = edata.getHeader();
|
||||
re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false);
|
||||
}
|
||||
re.recording = true;
|
||||
} else {
|
||||
// this is a sad case when we aren't recording, but don't have an iframe
|
||||
// nothing we can do but drop the frame
|
||||
delete msg;
|
||||
++re.dropped_frames;
|
||||
return bytes_count;
|
||||
}
|
||||
}
|
||||
|
||||
// we have to be recording if we are here
|
||||
assert(re.recording);
|
||||
|
||||
// if we are actually writing the video file, do so
|
||||
if (re.writer) {
|
||||
auto data = edata.getData();
|
||||
re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME);
|
||||
}
|
||||
|
||||
// put it in log stream as the idx packet
|
||||
MessageBuilder bmsg;
|
||||
auto evt = bmsg.initEvent(event.getValid());
|
||||
evt.setLogMonoTime(event.getLogMonoTime());
|
||||
(evt.*(encoder_info.set_encode_idx_func))(idx);
|
||||
auto new_msg = bmsg.toBytes();
|
||||
s->logger.write((uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog?
|
||||
bytes_count += new_msg.size();
|
||||
|
||||
// free the message, we used it
|
||||
delete msg;
|
||||
} else if (offset_segment_num > s->logger.segment()) {
|
||||
// encoderd packet has a newer segment, this means encoderd has rolled over
|
||||
if (!re.marked_ready_to_rotate) {
|
||||
re.marked_ready_to_rotate = true;
|
||||
++s->ready_to_rotate;
|
||||
LOGD("rotate %d -> %d ready %d/%d for %s",
|
||||
s->logger.segment(), offset_segment_num,
|
||||
s->ready_to_rotate.load(), s->max_waiting, name.c_str());
|
||||
}
|
||||
// queue up all the new segment messages, they go in after the rotate
|
||||
re.q.push_back(msg);
|
||||
} else {
|
||||
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d",
|
||||
name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset);
|
||||
// free the message, it's useless. this should never happen
|
||||
// actually, this can happen if you restart encoderd
|
||||
re.encoderd_segment_offset = -s->logger.segment();
|
||||
delete msg;
|
||||
}
|
||||
|
||||
return bytes_count;
|
||||
}
|
||||
|
||||
void handle_user_flag(LoggerdState *s) {
|
||||
static int prev_segment = -1;
|
||||
if (s->logger.segment() == prev_segment) return;
|
||||
|
||||
LOGW("preserving %s", s->logger.segmentPath().c_str());
|
||||
|
||||
#ifdef __APPLE__
|
||||
int ret = setxattr(s->logger.segmentPath().c_str(), PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0, 0);
|
||||
#else
|
||||
int ret = setxattr(s->logger.segmentPath().c_str(), PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0);
|
||||
#endif
|
||||
if (ret) {
|
||||
LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->logger.segmentPath().c_str(), strerror(errno));
|
||||
}
|
||||
|
||||
// mark route for uploading
|
||||
Params params;
|
||||
std::string routes = Params().get("AthenadRecentlyViewedRoutes");
|
||||
params.put("AthenadRecentlyViewedRoutes", routes + "," + s->logger.routeName());
|
||||
|
||||
prev_segment = s->logger.segment();
|
||||
}
|
||||
|
||||
void loggerd_thread() {
|
||||
// setup messaging
|
||||
typedef struct ServiceState {
|
||||
std::string name;
|
||||
int counter, freq;
|
||||
bool encoder, user_flag;
|
||||
} ServiceState;
|
||||
std::unordered_map<SubSocket*, ServiceState> service_state;
|
||||
std::unordered_map<SubSocket*, struct RemoteEncoder> remote_encoders;
|
||||
|
||||
std::unique_ptr<Context> ctx(Context::create());
|
||||
std::unique_ptr<Poller> poller(Poller::create());
|
||||
|
||||
// subscribe to all socks
|
||||
for (const auto& [_, it] : services) {
|
||||
const bool encoder = util::ends_with(it.name, "EncodeData");
|
||||
const bool livestream_encoder = util::starts_with(it.name, "livestream");
|
||||
if (!it.should_log && (!encoder || livestream_encoder)) continue;
|
||||
LOGD("logging %s (on port %d)", it.name.c_str(), it.port);
|
||||
|
||||
SubSocket * sock = SubSocket::create(ctx.get(), it.name);
|
||||
assert(sock != NULL);
|
||||
poller->registerSocket(sock);
|
||||
service_state[sock] = {
|
||||
.name = it.name,
|
||||
.counter = 0,
|
||||
.freq = it.decimation,
|
||||
.encoder = encoder,
|
||||
.user_flag = it.name == "userFlag",
|
||||
};
|
||||
}
|
||||
|
||||
LoggerdState s;
|
||||
// init logger
|
||||
logger_rotate(&s);
|
||||
Params().put("CurrentRoute", s.logger.routeName());
|
||||
|
||||
std::map<std::string, EncoderInfo> encoder_infos_dict;
|
||||
for (const auto &cam : cameras_logged) {
|
||||
for (const auto &encoder_info : cam.encoder_infos) {
|
||||
encoder_infos_dict[encoder_info.publish_name] = encoder_info;
|
||||
s.max_waiting++;
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t msg_count = 0, bytes_count = 0;
|
||||
double start_ts = millis_since_boot();
|
||||
while (!do_exit) {
|
||||
// poll for new messages on all sockets
|
||||
for (auto sock : poller->poll(1000)) {
|
||||
if (do_exit) break;
|
||||
|
||||
ServiceState &service = service_state[sock];
|
||||
if (service.user_flag) {
|
||||
handle_user_flag(&s);
|
||||
}
|
||||
|
||||
// drain socket
|
||||
int count = 0;
|
||||
Message *msg = nullptr;
|
||||
while (!do_exit && (msg = sock->receive(true))) {
|
||||
const bool in_qlog = service.freq != -1 && (service.counter++ % service.freq == 0);
|
||||
if (service.encoder) {
|
||||
s.last_camera_seen_tms = millis_since_boot();
|
||||
bytes_count += handle_encoder_msg(&s, msg, service.name, remote_encoders[sock], encoder_infos_dict[service.name]);
|
||||
} else {
|
||||
s.logger.write((uint8_t *)msg->getData(), msg->getSize(), in_qlog);
|
||||
bytes_count += msg->getSize();
|
||||
delete msg;
|
||||
}
|
||||
|
||||
rotate_if_needed(&s);
|
||||
|
||||
if ((++msg_count % 1000) == 0) {
|
||||
double seconds = (millis_since_boot() - start_ts) / 1000.0;
|
||||
LOGD("%" PRIu64 " messages, %.2f msg/sec, %.2f KB/sec", msg_count, msg_count / seconds, bytes_count * 0.001 / seconds);
|
||||
}
|
||||
|
||||
count++;
|
||||
if (count >= 200) {
|
||||
LOGD("large volume of '%s' messages", service.name.c_str());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOGW("closing logger");
|
||||
s.logger.setExitSignal(do_exit.signal);
|
||||
|
||||
if (do_exit.power_failure) {
|
||||
LOGE("power failure");
|
||||
sync();
|
||||
LOGE("sync done");
|
||||
}
|
||||
|
||||
// messaging cleanup
|
||||
for (auto &[sock, service] : service_state) delete sock;
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
if (!Hardware::PC()) {
|
||||
int ret;
|
||||
ret = util::set_core_affinity({0, 1, 2, 3});
|
||||
assert(ret == 0);
|
||||
// TODO: why does this impact camerad timings?
|
||||
//ret = util::set_realtime_priority(1);
|
||||
//assert(ret == 0);
|
||||
}
|
||||
|
||||
loggerd_thread();
|
||||
|
||||
return 0;
|
||||
}
|
||||
154
system/loggerd/loggerd.h
Normal file
154
system/loggerd/loggerd.h
Normal file
@@ -0,0 +1,154 @@
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "cereal/messaging/messaging.h"
|
||||
#include "cereal/services.h"
|
||||
#include "cereal/visionipc/visionipc_client.h"
|
||||
#include "system/camerad/cameras/camera_common.h"
|
||||
#include "system/hardware/hw.h"
|
||||
#include "common/params.h"
|
||||
#include "common/swaglog.h"
|
||||
#include "common/util.h"
|
||||
|
||||
#include "system/loggerd/logger.h"
|
||||
|
||||
constexpr int MAIN_FPS = 20;
|
||||
const int MAIN_BITRATE = Params().getBool("HigherBitrate") ? 20000000 : 1e7;
|
||||
const int LIVESTREAM_BITRATE = 1e6;
|
||||
const int QCAM_BITRATE = 256000;
|
||||
|
||||
#define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead
|
||||
|
||||
#define INIT_ENCODE_FUNCTIONS(encode_type) \
|
||||
.get_encode_data_func = &cereal::Event::Reader::get##encode_type##Data, \
|
||||
.set_encode_idx_func = &cereal::Event::Builder::set##encode_type##Idx, \
|
||||
.init_encode_data_func = &cereal::Event::Builder::init##encode_type##Data
|
||||
|
||||
const bool LOGGERD_TEST = getenv("LOGGERD_TEST");
|
||||
const int SEGMENT_LENGTH = LOGGERD_TEST ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60;
|
||||
|
||||
constexpr char PRESERVE_ATTR_NAME[] = "user.preserve";
|
||||
constexpr char PRESERVE_ATTR_VALUE = '1';
|
||||
class EncoderInfo {
|
||||
public:
|
||||
const char *publish_name;
|
||||
const char *filename = NULL;
|
||||
bool record = true;
|
||||
int frame_width = -1;
|
||||
int frame_height = -1;
|
||||
int fps = MAIN_FPS;
|
||||
int bitrate = MAIN_BITRATE;
|
||||
cereal::EncodeIndex::Type encode_type = Hardware::PC() ? cereal::EncodeIndex::Type::BIG_BOX_LOSSLESS
|
||||
: cereal::EncodeIndex::Type::FULL_H_E_V_C;
|
||||
::cereal::EncodeData::Reader (cereal::Event::Reader::*get_encode_data_func)() const;
|
||||
void (cereal::Event::Builder::*set_encode_idx_func)(::cereal::EncodeIndex::Reader);
|
||||
cereal::EncodeData::Builder (cereal::Event::Builder::*init_encode_data_func)();
|
||||
};
|
||||
|
||||
class LogCameraInfo {
|
||||
public:
|
||||
const char *thread_name;
|
||||
int fps = MAIN_FPS;
|
||||
CameraType type;
|
||||
VisionStreamType stream_type;
|
||||
std::vector<EncoderInfo> encoder_infos;
|
||||
};
|
||||
|
||||
const EncoderInfo main_road_encoder_info = {
|
||||
.publish_name = "roadEncodeData",
|
||||
.filename = "fcamera.hevc",
|
||||
INIT_ENCODE_FUNCTIONS(RoadEncode),
|
||||
};
|
||||
|
||||
const EncoderInfo main_wide_road_encoder_info = {
|
||||
.publish_name = "wideRoadEncodeData",
|
||||
.filename = "ecamera.hevc",
|
||||
INIT_ENCODE_FUNCTIONS(WideRoadEncode),
|
||||
};
|
||||
|
||||
const EncoderInfo main_driver_encoder_info = {
|
||||
.publish_name = "driverEncodeData",
|
||||
.filename = "dcamera.hevc",
|
||||
.record = Params().getBool("RecordFront"),
|
||||
INIT_ENCODE_FUNCTIONS(DriverEncode),
|
||||
};
|
||||
|
||||
const EncoderInfo stream_road_encoder_info = {
|
||||
.publish_name = "livestreamRoadEncodeData",
|
||||
.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264,
|
||||
.record = false,
|
||||
.bitrate = LIVESTREAM_BITRATE,
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode),
|
||||
};
|
||||
|
||||
const EncoderInfo stream_wide_road_encoder_info = {
|
||||
.publish_name = "livestreamWideRoadEncodeData",
|
||||
.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264,
|
||||
.record = false,
|
||||
.bitrate = LIVESTREAM_BITRATE,
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode),
|
||||
};
|
||||
|
||||
const EncoderInfo stream_driver_encoder_info = {
|
||||
.publish_name = "livestreamDriverEncodeData",
|
||||
.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264,
|
||||
.record = false,
|
||||
.bitrate = LIVESTREAM_BITRATE,
|
||||
INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode),
|
||||
};
|
||||
|
||||
const EncoderInfo qcam_encoder_info = {
|
||||
.publish_name = "qRoadEncodeData",
|
||||
.filename = "qcamera.ts",
|
||||
.bitrate = QCAM_BITRATE,
|
||||
.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264,
|
||||
.frame_width = 526,
|
||||
.frame_height = 330,
|
||||
INIT_ENCODE_FUNCTIONS(QRoadEncode),
|
||||
};
|
||||
|
||||
const LogCameraInfo road_camera_info{
|
||||
.thread_name = "road_cam_encoder",
|
||||
.type = RoadCam,
|
||||
.stream_type = VISION_STREAM_ROAD,
|
||||
.encoder_infos = {main_road_encoder_info, qcam_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo wide_road_camera_info{
|
||||
.thread_name = "wide_road_cam_encoder",
|
||||
.type = WideRoadCam,
|
||||
.stream_type = VISION_STREAM_WIDE_ROAD,
|
||||
.encoder_infos = {main_wide_road_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo driver_camera_info{
|
||||
.thread_name = "driver_cam_encoder",
|
||||
.type = DriverCam,
|
||||
.stream_type = VISION_STREAM_DRIVER,
|
||||
.encoder_infos = {main_driver_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_road_camera_info{
|
||||
.thread_name = "road_cam_encoder",
|
||||
.type = RoadCam,
|
||||
.stream_type = VISION_STREAM_ROAD,
|
||||
.encoder_infos = {stream_road_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_wide_road_camera_info{
|
||||
.thread_name = "wide_road_cam_encoder",
|
||||
.type = WideRoadCam,
|
||||
.stream_type = VISION_STREAM_WIDE_ROAD,
|
||||
.encoder_infos = {stream_wide_road_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo stream_driver_camera_info{
|
||||
.thread_name = "driver_cam_encoder",
|
||||
.type = DriverCam,
|
||||
.stream_type = VISION_STREAM_DRIVER,
|
||||
.encoder_infos = {stream_driver_encoder_info}
|
||||
};
|
||||
|
||||
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
|
||||
const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info};
|
||||
0
system/loggerd/tests/__init__.py
Normal file
0
system/loggerd/tests/__init__.py
Normal file
91
system/loggerd/tests/loggerd_tests_common.py
Normal file
91
system/loggerd/tests/loggerd_tests_common.py
Normal file
@@ -0,0 +1,91 @@
|
||||
import os
|
||||
import random
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
import openpilot.system.loggerd.deleter as deleter
|
||||
import openpilot.system.loggerd.uploader as uploader
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.system.hardware.hw import Paths
|
||||
from openpilot.system.loggerd.xattr_cache import setxattr
|
||||
|
||||
|
||||
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: bytes = None) -> None:
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if lock:
|
||||
lock_path = str(file_path) + ".lock"
|
||||
os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL))
|
||||
|
||||
chunks = 128
|
||||
chunk_bytes = int(size_mb * 1024 * 1024 / chunks)
|
||||
data = os.urandom(chunk_bytes)
|
||||
|
||||
with open(file_path, "wb") as f:
|
||||
for _ in range(chunks):
|
||||
f.write(data)
|
||||
|
||||
if upload_xattr is not None:
|
||||
setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr)
|
||||
|
||||
class MockResponse():
|
||||
def __init__(self, text, status_code):
|
||||
self.text = text
|
||||
self.status_code = status_code
|
||||
|
||||
class MockApi():
|
||||
def __init__(self, dongle_id):
|
||||
pass
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200)
|
||||
|
||||
def get_token(self):
|
||||
return "fake-token"
|
||||
|
||||
class MockApiIgnore():
|
||||
def __init__(self, dongle_id):
|
||||
pass
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
return MockResponse('', 412)
|
||||
|
||||
def get_token(self):
|
||||
return "fake-token"
|
||||
|
||||
class UploaderTestCase(unittest.TestCase):
|
||||
f_type = "UNKNOWN"
|
||||
|
||||
root: Path
|
||||
seg_num: int
|
||||
seg_format: str
|
||||
seg_format2: str
|
||||
seg_dir: str
|
||||
|
||||
def set_ignore(self):
|
||||
uploader.Api = MockApiIgnore
|
||||
|
||||
def setUp(self):
|
||||
uploader.Api = MockApi
|
||||
uploader.fake_upload = True
|
||||
uploader.force_wifi = True
|
||||
uploader.allow_sleep = False
|
||||
self.seg_num = random.randint(1, 300)
|
||||
self.seg_format = "00000004--0ac3964c96--{}"
|
||||
self.seg_format2 = "00000005--4c4e99b08b--{}"
|
||||
self.seg_dir = self.seg_format.format(self.seg_num)
|
||||
|
||||
self.params = Params()
|
||||
self.params.put("IsOffroad", "1")
|
||||
self.params.put("DongleId", "0000000000000000")
|
||||
|
||||
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
|
||||
upload_xattr: bytes = None, preserve_xattr: bytes = None) -> Path:
|
||||
file_path = Path(Paths.log_root()) / f_dir / fn
|
||||
create_random_file(file_path, size_mb, lock, upload_xattr)
|
||||
|
||||
if preserve_xattr is not None:
|
||||
setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr)
|
||||
|
||||
return file_path
|
||||
123
system/loggerd/tests/test_deleter.py
Normal file
123
system/loggerd/tests/test_deleter.py
Normal file
@@ -0,0 +1,123 @@
|
||||
#!/usr/bin/env python3
|
||||
import time
|
||||
import threading
|
||||
import unittest
|
||||
from collections import namedtuple
|
||||
from pathlib import Path
|
||||
from collections.abc import Sequence
|
||||
|
||||
import openpilot.system.loggerd.deleter as deleter
|
||||
from openpilot.common.timeout import Timeout, TimeoutException
|
||||
from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase
|
||||
|
||||
Stats = namedtuple("Stats", ['f_bavail', 'f_blocks', 'f_frsize'])
|
||||
|
||||
|
||||
class TestDeleter(UploaderTestCase):
|
||||
def fake_statvfs(self, d):
|
||||
return self.fake_stats
|
||||
|
||||
def setUp(self):
|
||||
self.f_type = "fcamera.hevc"
|
||||
super().setUp()
|
||||
self.fake_stats = Stats(f_bavail=0, f_blocks=10, f_frsize=4096)
|
||||
deleter.os.statvfs = self.fake_statvfs
|
||||
|
||||
def start_thread(self):
|
||||
self.end_event = threading.Event()
|
||||
self.del_thread = threading.Thread(target=deleter.deleter_thread, args=[self.end_event])
|
||||
self.del_thread.daemon = True
|
||||
self.del_thread.start()
|
||||
|
||||
def join_thread(self):
|
||||
self.end_event.set()
|
||||
self.del_thread.join()
|
||||
|
||||
def test_delete(self):
|
||||
f_path = self.make_file_with_data(self.seg_dir, self.f_type, 1)
|
||||
|
||||
self.start_thread()
|
||||
|
||||
try:
|
||||
with Timeout(2, "Timeout waiting for file to be deleted"):
|
||||
while f_path.exists():
|
||||
time.sleep(0.01)
|
||||
finally:
|
||||
self.join_thread()
|
||||
|
||||
def assertDeleteOrder(self, f_paths: Sequence[Path], timeout: int = 5) -> None:
|
||||
deleted_order = []
|
||||
|
||||
self.start_thread()
|
||||
try:
|
||||
with Timeout(timeout, "Timeout waiting for files to be deleted"):
|
||||
while True:
|
||||
for f in f_paths:
|
||||
if not f.exists() and f not in deleted_order:
|
||||
deleted_order.append(f)
|
||||
if len(deleted_order) == len(f_paths):
|
||||
break
|
||||
time.sleep(0.01)
|
||||
except TimeoutException:
|
||||
print("Not deleted:", [f for f in f_paths if f not in deleted_order])
|
||||
raise
|
||||
finally:
|
||||
self.join_thread()
|
||||
|
||||
self.assertEqual(deleted_order, f_paths, "Files not deleted in expected order")
|
||||
|
||||
def test_delete_order(self):
|
||||
self.assertDeleteOrder([
|
||||
self.make_file_with_data(self.seg_format.format(0), self.f_type),
|
||||
self.make_file_with_data(self.seg_format.format(1), self.f_type),
|
||||
self.make_file_with_data(self.seg_format2.format(0), self.f_type),
|
||||
])
|
||||
|
||||
def test_delete_many_preserved(self):
|
||||
self.assertDeleteOrder([
|
||||
self.make_file_with_data(self.seg_format.format(0), self.f_type),
|
||||
self.make_file_with_data(self.seg_format.format(1), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE),
|
||||
self.make_file_with_data(self.seg_format.format(2), self.f_type),
|
||||
] + [
|
||||
self.make_file_with_data(self.seg_format2.format(i), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE)
|
||||
for i in range(5)
|
||||
])
|
||||
|
||||
def test_delete_last(self):
|
||||
self.assertDeleteOrder([
|
||||
self.make_file_with_data(self.seg_format.format(1), self.f_type),
|
||||
self.make_file_with_data(self.seg_format2.format(0), self.f_type),
|
||||
self.make_file_with_data(self.seg_format.format(0), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE),
|
||||
self.make_file_with_data("boot", self.seg_format[:-4]),
|
||||
self.make_file_with_data("crash", self.seg_format2[:-4]),
|
||||
])
|
||||
|
||||
def test_no_delete_when_available_space(self):
|
||||
f_path = self.make_file_with_data(self.seg_dir, self.f_type)
|
||||
|
||||
block_size = 4096
|
||||
available = (10 * 1024 * 1024 * 1024) / block_size # 10GB free
|
||||
self.fake_stats = Stats(f_bavail=available, f_blocks=10, f_frsize=block_size)
|
||||
|
||||
self.start_thread()
|
||||
start_time = time.monotonic()
|
||||
while f_path.exists() and time.monotonic() - start_time < 2:
|
||||
time.sleep(0.01)
|
||||
self.join_thread()
|
||||
|
||||
self.assertTrue(f_path.exists(), "File deleted with available space")
|
||||
|
||||
def test_no_delete_with_lock_file(self):
|
||||
f_path = self.make_file_with_data(self.seg_dir, self.f_type, lock=True)
|
||||
|
||||
self.start_thread()
|
||||
start_time = time.monotonic()
|
||||
while f_path.exists() and time.monotonic() - start_time < 2:
|
||||
time.sleep(0.01)
|
||||
self.join_thread()
|
||||
|
||||
self.assertTrue(f_path.exists(), "File deleted when locked")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
156
system/loggerd/tests/test_encoder.py
Normal file
156
system/loggerd/tests/test_encoder.py
Normal file
@@ -0,0 +1,156 @@
|
||||
#!/usr/bin/env python3
|
||||
import math
|
||||
import os
|
||||
import pytest
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from parameterized import parameterized
|
||||
from tqdm import trange
|
||||
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.common.timeout import Timeout
|
||||
from openpilot.system.hardware import TICI
|
||||
from openpilot.selfdrive.manager.process_config import managed_processes
|
||||
from openpilot.tools.lib.logreader import LogReader
|
||||
from openpilot.system.hardware.hw import Paths
|
||||
|
||||
SEGMENT_LENGTH = 2
|
||||
FULL_SIZE = 2507572
|
||||
CAMERAS = [
|
||||
("fcamera.hevc", 20, FULL_SIZE, "roadEncodeIdx"),
|
||||
("dcamera.hevc", 20, FULL_SIZE, "driverEncodeIdx"),
|
||||
("ecamera.hevc", 20, FULL_SIZE, "wideRoadEncodeIdx"),
|
||||
("qcamera.ts", 20, 130000, None),
|
||||
]
|
||||
|
||||
# we check frame count, so we don't have to be too strict on size
|
||||
FILE_SIZE_TOLERANCE = 0.5
|
||||
|
||||
|
||||
@pytest.mark.tici # TODO: all of loggerd should work on PC
|
||||
class TestEncoder(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self._clear_logs()
|
||||
os.environ["LOGGERD_TEST"] = "1"
|
||||
os.environ["LOGGERD_SEGMENT_LENGTH"] = str(SEGMENT_LENGTH)
|
||||
|
||||
def tearDown(self):
|
||||
self._clear_logs()
|
||||
|
||||
def _clear_logs(self):
|
||||
if os.path.exists(Paths.log_root()):
|
||||
shutil.rmtree(Paths.log_root())
|
||||
|
||||
def _get_latest_segment_path(self):
|
||||
last_route = sorted(Path(Paths.log_root()).iterdir())[-1]
|
||||
return os.path.join(Paths.log_root(), last_route)
|
||||
|
||||
# TODO: this should run faster than real time
|
||||
@parameterized.expand([(True, ), (False, )])
|
||||
def test_log_rotation(self, record_front):
|
||||
Params().put_bool("RecordFront", record_front)
|
||||
|
||||
managed_processes['sensord'].start()
|
||||
managed_processes['loggerd'].start()
|
||||
managed_processes['encoderd'].start()
|
||||
|
||||
time.sleep(1.0)
|
||||
managed_processes['camerad'].start()
|
||||
|
||||
num_segments = int(os.getenv("SEGMENTS", random.randint(10, 15)))
|
||||
|
||||
# wait for loggerd to make the dir for first segment
|
||||
route_prefix_path = None
|
||||
with Timeout(int(SEGMENT_LENGTH*3)):
|
||||
while route_prefix_path is None:
|
||||
try:
|
||||
route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0]
|
||||
except Exception:
|
||||
time.sleep(0.1)
|
||||
|
||||
def check_seg(i):
|
||||
# check each camera file size
|
||||
counts = []
|
||||
first_frames = []
|
||||
for camera, fps, size, encode_idx_name in CAMERAS:
|
||||
if not record_front and "dcamera" in camera:
|
||||
continue
|
||||
|
||||
file_path = f"{route_prefix_path}--{i}/{camera}"
|
||||
|
||||
# check file exists
|
||||
self.assertTrue(os.path.exists(file_path), f"segment #{i}: '{file_path}' missing")
|
||||
|
||||
# TODO: this ffprobe call is really slow
|
||||
# check frame count
|
||||
cmd = f"ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 {file_path}"
|
||||
if TICI:
|
||||
cmd = "LD_LIBRARY_PATH=/usr/local/lib " + cmd
|
||||
|
||||
expected_frames = fps * SEGMENT_LENGTH
|
||||
probe = subprocess.check_output(cmd, shell=True, encoding='utf8')
|
||||
frame_count = int(probe.split('\n')[0].strip())
|
||||
counts.append(frame_count)
|
||||
|
||||
self.assertEqual(frame_count, expected_frames,
|
||||
f"segment #{i}: {camera} failed frame count check: expected {expected_frames}, got {frame_count}")
|
||||
|
||||
# sanity check file size
|
||||
file_size = os.path.getsize(file_path)
|
||||
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE),
|
||||
f"{file_path} size {file_size} isn't close to target size {size}")
|
||||
|
||||
# Check encodeIdx
|
||||
if encode_idx_name is not None:
|
||||
rlog_path = f"{route_prefix_path}--{i}/rlog"
|
||||
msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name]
|
||||
encode_msgs = [getattr(m, encode_idx_name) for m in msgs]
|
||||
|
||||
valid = [m.valid for m in msgs]
|
||||
segment_idxs = [m.segmentId for m in encode_msgs]
|
||||
encode_idxs = [m.encodeId for m in encode_msgs]
|
||||
frame_idxs = [m.frameId for m in encode_msgs]
|
||||
|
||||
# Check frame count
|
||||
self.assertEqual(frame_count, len(segment_idxs))
|
||||
self.assertEqual(frame_count, len(encode_idxs))
|
||||
|
||||
# Check for duplicates or skips
|
||||
self.assertEqual(0, segment_idxs[0])
|
||||
self.assertEqual(len(set(segment_idxs)), len(segment_idxs))
|
||||
|
||||
self.assertTrue(all(valid))
|
||||
|
||||
self.assertEqual(expected_frames * i, encode_idxs[0])
|
||||
first_frames.append(frame_idxs[0])
|
||||
self.assertEqual(len(set(encode_idxs)), len(encode_idxs))
|
||||
|
||||
self.assertEqual(1, len(set(first_frames)))
|
||||
|
||||
if TICI:
|
||||
expected_frames = fps * SEGMENT_LENGTH
|
||||
self.assertEqual(min(counts), expected_frames)
|
||||
shutil.rmtree(f"{route_prefix_path}--{i}")
|
||||
|
||||
try:
|
||||
for i in trange(num_segments):
|
||||
# poll for next segment
|
||||
with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"):
|
||||
while Path(f"{route_prefix_path}--{i+1}") not in Path(Paths.log_root()).iterdir():
|
||||
time.sleep(0.1)
|
||||
check_seg(i)
|
||||
finally:
|
||||
managed_processes['loggerd'].stop()
|
||||
managed_processes['encoderd'].stop()
|
||||
managed_processes['camerad'].stop()
|
||||
managed_processes['sensord'].stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
74
system/loggerd/tests/test_logger.cc
Normal file
74
system/loggerd/tests/test_logger.cc
Normal file
@@ -0,0 +1,74 @@
|
||||
#include "catch2/catch.hpp"
|
||||
#include "system/loggerd/logger.h"
|
||||
|
||||
typedef cereal::Sentinel::SentinelType SentinelType;
|
||||
|
||||
void verify_segment(const std::string &route_path, int segment, int max_segment, int required_event_cnt) {
|
||||
const std::string segment_path = route_path + "--" + std::to_string(segment);
|
||||
SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT;
|
||||
SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT;
|
||||
|
||||
REQUIRE(!util::file_exists(segment_path + "/rlog.lock"));
|
||||
for (const char *fn : {"/rlog", "/qlog"}) {
|
||||
const std::string log_file = segment_path + fn;
|
||||
std::string log = util::read_file(log_file);
|
||||
REQUIRE(!log.empty());
|
||||
int event_cnt = 0, i = 0;
|
||||
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word));
|
||||
while (words.size() > 0) {
|
||||
try {
|
||||
capnp::FlatArrayMessageReader reader(words);
|
||||
auto event = reader.getRoot<cereal::Event>();
|
||||
words = kj::arrayPtr(reader.getEnd(), words.end());
|
||||
if (i == 0) {
|
||||
REQUIRE(event.which() == cereal::Event::INIT_DATA);
|
||||
} else if (i == 1) {
|
||||
REQUIRE(event.which() == cereal::Event::SENTINEL);
|
||||
REQUIRE(event.getSentinel().getType() == begin_sentinel);
|
||||
REQUIRE(event.getSentinel().getSignal() == 0);
|
||||
} else if (words.size() > 0) {
|
||||
REQUIRE(event.which() == cereal::Event::CLOCKS);
|
||||
++event_cnt;
|
||||
} else {
|
||||
// the last event must be SENTINEL
|
||||
REQUIRE(event.which() == cereal::Event::SENTINEL);
|
||||
REQUIRE(event.getSentinel().getType() == end_sentinel);
|
||||
REQUIRE(event.getSentinel().getSignal() == (end_sentinel == SentinelType::END_OF_ROUTE ? 1 : 0));
|
||||
}
|
||||
++i;
|
||||
} catch (const kj::Exception &ex) {
|
||||
INFO("failed parse " << i << " exception :" << ex.getDescription());
|
||||
REQUIRE(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
REQUIRE(event_cnt == required_event_cnt);
|
||||
}
|
||||
}
|
||||
|
||||
void write_msg(LoggerState *logger) {
|
||||
MessageBuilder msg;
|
||||
msg.initEvent().initClocks();
|
||||
logger->write(msg.toBytes(), true);
|
||||
}
|
||||
|
||||
TEST_CASE("logger") {
|
||||
const int segment_cnt = 100;
|
||||
const std::string log_root = "/tmp/test_logger";
|
||||
system(("rm " + log_root + " -rf").c_str());
|
||||
std::string route_name;
|
||||
{
|
||||
LoggerState logger(log_root);
|
||||
route_name = logger.routeName();
|
||||
for (int i = 0; i < segment_cnt; ++i) {
|
||||
REQUIRE(logger.next());
|
||||
REQUIRE(util::file_exists(logger.segmentPath() + "/rlog.lock"));
|
||||
REQUIRE(logger.segment() == i);
|
||||
write_msg(&logger);
|
||||
}
|
||||
logger.setExitSignal(1);
|
||||
}
|
||||
for (int i = 0; i < segment_cnt; ++i) {
|
||||
verify_segment(log_root + "/" + route_name, i, segment_cnt, 1);
|
||||
}
|
||||
}
|
||||
285
system/loggerd/tests/test_loggerd.py
Normal file
285
system/loggerd/tests/test_loggerd.py
Normal file
@@ -0,0 +1,285 @@
|
||||
#!/usr/bin/env python3
|
||||
import numpy as np
|
||||
import os
|
||||
import re
|
||||
import random
|
||||
import string
|
||||
import subprocess
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from flaky import flaky
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal import log
|
||||
from cereal.services import SERVICE_LIST
|
||||
from openpilot.common.basedir import BASEDIR
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.common.timeout import Timeout
|
||||
from openpilot.system.hardware.hw import Paths
|
||||
from openpilot.system.loggerd.xattr_cache import getxattr
|
||||
from openpilot.system.loggerd.deleter import PRESERVE_ATTR_NAME, PRESERVE_ATTR_VALUE
|
||||
from openpilot.selfdrive.manager.process_config import managed_processes
|
||||
from openpilot.system.version import get_version
|
||||
from openpilot.tools.lib.helpers import RE
|
||||
from openpilot.tools.lib.logreader import LogReader
|
||||
from cereal.visionipc import VisionIpcServer, VisionStreamType
|
||||
from openpilot.common.transformations.camera import DEVICE_CAMERAS
|
||||
|
||||
SentinelType = log.Sentinel.SentinelType
|
||||
|
||||
CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in SERVICE_LIST
|
||||
and SERVICE_LIST[f].should_log and "encode" not in f.lower()]
|
||||
|
||||
|
||||
class TestLoggerd:
|
||||
def _get_latest_log_dir(self):
|
||||
log_dirs = sorted(Path(Paths.log_root()).iterdir(), key=lambda f: f.stat().st_mtime)
|
||||
return log_dirs[-1]
|
||||
|
||||
def _get_log_dir(self, x):
|
||||
for l in x.splitlines():
|
||||
for p in l.split(' '):
|
||||
path = Path(p.strip())
|
||||
if path.is_dir():
|
||||
return path
|
||||
return None
|
||||
|
||||
def _get_log_fn(self, x):
|
||||
for l in x.splitlines():
|
||||
for p in l.split(' '):
|
||||
path = Path(p.strip())
|
||||
if path.is_file():
|
||||
return path
|
||||
return None
|
||||
|
||||
def _gen_bootlog(self):
|
||||
with Timeout(5):
|
||||
out = subprocess.check_output("./bootlog", cwd=os.path.join(BASEDIR, "system/loggerd"), encoding='utf-8')
|
||||
|
||||
log_fn = self._get_log_fn(out)
|
||||
|
||||
# check existence
|
||||
assert log_fn is not None
|
||||
|
||||
return log_fn
|
||||
|
||||
def _check_init_data(self, msgs):
|
||||
msg = msgs[0]
|
||||
assert msg.which() == 'initData'
|
||||
|
||||
def _check_sentinel(self, msgs, route):
|
||||
start_type = SentinelType.startOfRoute if route else SentinelType.startOfSegment
|
||||
assert msgs[1].sentinel.type == start_type
|
||||
|
||||
end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment
|
||||
assert msgs[-1].sentinel.type == end_type
|
||||
|
||||
def _publish_random_messages(self, services: list[str]) -> dict[str, list]:
|
||||
pm = messaging.PubMaster(services)
|
||||
|
||||
managed_processes["loggerd"].start()
|
||||
for s in services:
|
||||
assert pm.wait_for_readers_to_update(s, timeout=5)
|
||||
|
||||
sent_msgs = defaultdict(list)
|
||||
for _ in range(random.randint(2, 10) * 100):
|
||||
for s in services:
|
||||
try:
|
||||
m = messaging.new_message(s)
|
||||
except Exception:
|
||||
m = messaging.new_message(s, random.randint(2, 10))
|
||||
pm.send(s, m)
|
||||
sent_msgs[s].append(m)
|
||||
|
||||
for s in services:
|
||||
assert pm.wait_for_readers_to_update(s, timeout=5)
|
||||
managed_processes["loggerd"].stop()
|
||||
|
||||
return sent_msgs
|
||||
|
||||
def test_init_data_values(self):
|
||||
os.environ["CLEAN"] = random.choice(["0", "1"])
|
||||
|
||||
dongle = ''.join(random.choice(string.printable) for n in range(random.randint(1, 100)))
|
||||
fake_params = [
|
||||
# param, initData field, value
|
||||
("DongleId", "dongleId", dongle),
|
||||
("GitCommit", "gitCommit", "commit"),
|
||||
("GitCommitDate", "gitCommitDate", "date"),
|
||||
("GitBranch", "gitBranch", "branch"),
|
||||
("GitRemote", "gitRemote", "remote"),
|
||||
]
|
||||
params = Params()
|
||||
for k, _, v in fake_params:
|
||||
params.put(k, v)
|
||||
params.put("AccessToken", "abc")
|
||||
|
||||
lr = list(LogReader(str(self._gen_bootlog())))
|
||||
initData = lr[0].initData
|
||||
|
||||
assert initData.dirty != bool(os.environ["CLEAN"])
|
||||
assert initData.version == get_version()
|
||||
|
||||
if os.path.isfile("/proc/cmdline"):
|
||||
with open("/proc/cmdline") as f:
|
||||
assert list(initData.kernelArgs) == f.read().strip().split(" ")
|
||||
|
||||
with open("/proc/version") as f:
|
||||
assert initData.kernelVersion == f.read()
|
||||
|
||||
# check params
|
||||
logged_params = {entry.key: entry.value for entry in initData.params.entries}
|
||||
expected_params = {k for k, _, __ in fake_params} | {'AccessToken', 'BootCount'}
|
||||
assert set(logged_params.keys()) == expected_params, set(logged_params.keys()) ^ expected_params
|
||||
assert logged_params['AccessToken'] == b'', f"DONT_LOG param value was logged: {repr(logged_params['AccessToken'])}"
|
||||
for param_key, initData_key, v in fake_params:
|
||||
assert getattr(initData, initData_key) == v
|
||||
assert logged_params[param_key].decode() == v
|
||||
|
||||
@flaky(max_runs=3)
|
||||
def test_rotation(self):
|
||||
os.environ["LOGGERD_TEST"] = "1"
|
||||
Params().put("RecordFront", "1")
|
||||
|
||||
d = DEVICE_CAMERAS[("tici", "ar0231")]
|
||||
expected_files = {"rlog", "qlog", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"}
|
||||
streams = [(VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048*2346, 2048, 2048*1216), "roadCameraState"),
|
||||
(VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048*2346, 2048, 2048*1216), "driverCameraState"),
|
||||
(VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048*2346, 2048, 2048*1216), "wideRoadCameraState")]
|
||||
|
||||
pm = messaging.PubMaster(["roadCameraState", "driverCameraState", "wideRoadCameraState"])
|
||||
vipc_server = VisionIpcServer("camerad")
|
||||
for stream_type, frame_spec, _ in streams:
|
||||
vipc_server.create_buffers_with_sizes(stream_type, 40, False, *(frame_spec))
|
||||
vipc_server.start_listener()
|
||||
|
||||
num_segs = random.randint(2, 5)
|
||||
length = random.randint(1, 3)
|
||||
os.environ["LOGGERD_SEGMENT_LENGTH"] = str(length)
|
||||
managed_processes["loggerd"].start()
|
||||
managed_processes["encoderd"].start()
|
||||
assert pm.wait_for_readers_to_update("roadCameraState", timeout=5)
|
||||
|
||||
fps = 20.0
|
||||
for n in range(1, int(num_segs*length*fps)+1):
|
||||
for stream_type, frame_spec, state in streams:
|
||||
dat = np.empty(frame_spec[2], dtype=np.uint8)
|
||||
vipc_server.send(stream_type, dat[:].flatten().tobytes(), n, n/fps, n/fps)
|
||||
|
||||
camera_state = messaging.new_message(state)
|
||||
frame = getattr(camera_state, state)
|
||||
frame.frameId = n
|
||||
pm.send(state, camera_state)
|
||||
|
||||
for _, _, state in streams:
|
||||
assert pm.wait_for_readers_to_update(state, timeout=5, dt=0.001)
|
||||
|
||||
managed_processes["loggerd"].stop()
|
||||
managed_processes["encoderd"].stop()
|
||||
|
||||
route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0]
|
||||
for n in range(num_segs):
|
||||
p = Path(f"{route_path}--{n}")
|
||||
logged = {f.name for f in p.iterdir() if f.is_file()}
|
||||
diff = logged ^ expected_files
|
||||
assert len(diff) == 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}"
|
||||
|
||||
def test_bootlog(self):
|
||||
# generate bootlog with fake launch log
|
||||
launch_log = ''.join(str(random.choice(string.printable)) for _ in range(100))
|
||||
with open("/tmp/launch_log", "w") as f:
|
||||
f.write(launch_log)
|
||||
|
||||
bootlog_path = self._gen_bootlog()
|
||||
lr = list(LogReader(str(bootlog_path)))
|
||||
|
||||
# check length
|
||||
assert len(lr) == 2 # boot + initData
|
||||
|
||||
self._check_init_data(lr)
|
||||
|
||||
# check msgs
|
||||
bootlog_msgs = [m for m in lr if m.which() == 'boot']
|
||||
assert len(bootlog_msgs) == 1
|
||||
|
||||
# sanity check values
|
||||
boot = bootlog_msgs.pop().boot
|
||||
assert abs(boot.wallTimeNanos - time.time_ns()) < 5*1e9 # within 5s
|
||||
assert boot.launchLog == launch_log
|
||||
|
||||
for fn in ["console-ramoops", "pmsg-ramoops-0"]:
|
||||
path = Path(os.path.join("/sys/fs/pstore/", fn))
|
||||
if path.is_file():
|
||||
with open(path, "rb") as f:
|
||||
expected_val = f.read()
|
||||
bootlog_val = [e.value for e in boot.pstore.entries if e.key == fn][0]
|
||||
assert expected_val == bootlog_val
|
||||
|
||||
# next one should increment by one
|
||||
bl1 = re.match(RE.LOG_ID_V2, bootlog_path.name)
|
||||
bl2 = re.match(RE.LOG_ID_V2, self._gen_bootlog().name)
|
||||
assert bl1.group('uid') != bl2.group('uid')
|
||||
assert int(bl1.group('count')) == 0 and int(bl2.group('count')) == 1
|
||||
|
||||
def test_qlog(self):
|
||||
qlog_services = [s for s in CEREAL_SERVICES if SERVICE_LIST[s].decimation is not None]
|
||||
no_qlog_services = [s for s in CEREAL_SERVICES if SERVICE_LIST[s].decimation is None]
|
||||
|
||||
services = random.sample(qlog_services, random.randint(2, min(10, len(qlog_services)))) + \
|
||||
random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services))))
|
||||
sent_msgs = self._publish_random_messages(services)
|
||||
|
||||
qlog_path = os.path.join(self._get_latest_log_dir(), "qlog")
|
||||
lr = list(LogReader(qlog_path))
|
||||
|
||||
# check initData and sentinel
|
||||
self._check_init_data(lr)
|
||||
self._check_sentinel(lr, True)
|
||||
|
||||
recv_msgs = defaultdict(list)
|
||||
for m in lr:
|
||||
recv_msgs[m.which()].append(m)
|
||||
|
||||
for s, msgs in sent_msgs.items():
|
||||
recv_cnt = len(recv_msgs[s])
|
||||
|
||||
if s in no_qlog_services:
|
||||
# check services with no specific decimation aren't in qlog
|
||||
assert recv_cnt == 0, f"got {recv_cnt} {s} msgs in qlog"
|
||||
else:
|
||||
# check logged message count matches decimation
|
||||
expected_cnt = (len(msgs) - 1) // SERVICE_LIST[s].decimation + 1
|
||||
assert recv_cnt == expected_cnt, f"expected {expected_cnt} msgs for {s}, got {recv_cnt}"
|
||||
|
||||
def test_rlog(self):
|
||||
services = random.sample(CEREAL_SERVICES, random.randint(5, 10))
|
||||
sent_msgs = self._publish_random_messages(services)
|
||||
|
||||
lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog")))
|
||||
|
||||
# check initData and sentinel
|
||||
self._check_init_data(lr)
|
||||
self._check_sentinel(lr, True)
|
||||
|
||||
# check all messages were logged and in order
|
||||
lr = lr[2:-1] # slice off initData and both sentinels
|
||||
for m in lr:
|
||||
sent = sent_msgs[m.which()].pop(0)
|
||||
sent.clear_write_flag()
|
||||
assert sent.to_bytes() == m.as_builder().to_bytes()
|
||||
|
||||
def test_preserving_flagged_segments(self):
|
||||
services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) | {"userFlag"}
|
||||
self._publish_random_messages(services)
|
||||
|
||||
segment_dir = self._get_latest_log_dir()
|
||||
assert getxattr(segment_dir, PRESERVE_ATTR_NAME) == PRESERVE_ATTR_VALUE
|
||||
|
||||
def test_not_preserving_unflagged_segments(self):
|
||||
services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) - {"userFlag"}
|
||||
self._publish_random_messages(services)
|
||||
|
||||
segment_dir = self._get_latest_log_dir()
|
||||
assert getxattr(segment_dir, PRESERVE_ATTR_NAME) is None
|
||||
|
||||
2
system/loggerd/tests/test_runner.cc
Normal file
2
system/loggerd/tests/test_runner.cc
Normal file
@@ -0,0 +1,2 @@
|
||||
#define CATCH_CONFIG_MAIN
|
||||
#include "catch2/catch.hpp"
|
||||
190
system/loggerd/tests/test_uploader.py
Normal file
190
system/loggerd/tests/test_uploader.py
Normal file
@@ -0,0 +1,190 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import unittest
|
||||
import logging
|
||||
import json
|
||||
from pathlib import Path
|
||||
from openpilot.system.hardware.hw import Paths
|
||||
|
||||
from openpilot.common.swaglog import cloudlog
|
||||
from openpilot.system.loggerd.uploader import main, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE
|
||||
|
||||
from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase
|
||||
|
||||
|
||||
class FakeLogHandler(logging.Handler):
|
||||
def __init__(self):
|
||||
logging.Handler.__init__(self)
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.upload_order = list()
|
||||
self.upload_ignored = list()
|
||||
|
||||
def emit(self, record):
|
||||
try:
|
||||
j = json.loads(record.getMessage())
|
||||
if j["event"] == "upload_success":
|
||||
self.upload_order.append(j["key"])
|
||||
if j["event"] == "upload_ignored":
|
||||
self.upload_ignored.append(j["key"])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
log_handler = FakeLogHandler()
|
||||
cloudlog.addHandler(log_handler)
|
||||
|
||||
|
||||
class TestUploader(UploaderTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
log_handler.reset()
|
||||
|
||||
def start_thread(self):
|
||||
self.end_event = threading.Event()
|
||||
self.up_thread = threading.Thread(target=main, args=[self.end_event])
|
||||
self.up_thread.daemon = True
|
||||
self.up_thread.start()
|
||||
|
||||
def join_thread(self):
|
||||
self.end_event.set()
|
||||
self.up_thread.join()
|
||||
|
||||
def gen_files(self, lock=False, xattr: bytes = None, boot=True) -> list[Path]:
|
||||
f_paths = []
|
||||
for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]:
|
||||
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, upload_xattr=xattr))
|
||||
|
||||
if boot:
|
||||
f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock, upload_xattr=xattr))
|
||||
return f_paths
|
||||
|
||||
def gen_order(self, seg1: list[int], seg2: list[int], boot=True) -> list[str]:
|
||||
keys = []
|
||||
if boot:
|
||||
keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1]
|
||||
keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2]
|
||||
keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1]
|
||||
keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2]
|
||||
return keys
|
||||
|
||||
def test_upload(self):
|
||||
self.gen_files(lock=False)
|
||||
|
||||
self.start_thread()
|
||||
# allow enough time that files could upload twice if there is a bug in the logic
|
||||
time.sleep(5)
|
||||
self.join_thread()
|
||||
|
||||
exp_order = self.gen_order([self.seg_num], [])
|
||||
|
||||
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
|
||||
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
|
||||
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
|
||||
for f_path in exp_order:
|
||||
self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
|
||||
|
||||
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
|
||||
|
||||
def test_upload_with_wrong_xattr(self):
|
||||
self.gen_files(lock=False, xattr=b'0')
|
||||
|
||||
self.start_thread()
|
||||
# allow enough time that files could upload twice if there is a bug in the logic
|
||||
time.sleep(5)
|
||||
self.join_thread()
|
||||
|
||||
exp_order = self.gen_order([self.seg_num], [])
|
||||
|
||||
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
|
||||
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
|
||||
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
|
||||
for f_path in exp_order:
|
||||
self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
|
||||
|
||||
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
|
||||
|
||||
def test_upload_ignored(self):
|
||||
self.set_ignore()
|
||||
self.gen_files(lock=False)
|
||||
|
||||
self.start_thread()
|
||||
# allow enough time that files could upload twice if there is a bug in the logic
|
||||
time.sleep(5)
|
||||
self.join_thread()
|
||||
|
||||
exp_order = self.gen_order([self.seg_num], [])
|
||||
|
||||
self.assertTrue(len(log_handler.upload_order) == 0, "Some files were not ignored")
|
||||
self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore")
|
||||
self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice")
|
||||
for f_path in exp_order:
|
||||
self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not ignored")
|
||||
|
||||
self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order")
|
||||
|
||||
def test_upload_files_in_create_order(self):
|
||||
seg1_nums = [0, 1, 2, 10, 20]
|
||||
for i in seg1_nums:
|
||||
self.seg_dir = self.seg_format.format(i)
|
||||
self.gen_files(boot=False)
|
||||
seg2_nums = [5, 50, 51]
|
||||
for i in seg2_nums:
|
||||
self.seg_dir = self.seg_format2.format(i)
|
||||
self.gen_files(boot=False)
|
||||
|
||||
exp_order = self.gen_order(seg1_nums, seg2_nums, boot=False)
|
||||
|
||||
self.start_thread()
|
||||
# allow enough time that files could upload twice if there is a bug in the logic
|
||||
time.sleep(5)
|
||||
self.join_thread()
|
||||
|
||||
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
|
||||
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
|
||||
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
|
||||
for f_path in exp_order:
|
||||
self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
|
||||
|
||||
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
|
||||
|
||||
def test_no_upload_with_lock_file(self):
|
||||
self.start_thread()
|
||||
|
||||
time.sleep(0.25)
|
||||
f_paths = self.gen_files(lock=True, boot=False)
|
||||
|
||||
# allow enough time that files should have been uploaded if they would be uploaded
|
||||
time.sleep(5)
|
||||
self.join_thread()
|
||||
|
||||
for f_path in f_paths:
|
||||
fn = f_path.with_suffix(f_path.suffix.replace(".bz2", ""))
|
||||
uploaded = UPLOAD_ATTR_NAME in os.listxattr(fn) and os.getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE
|
||||
self.assertFalse(uploaded, "File upload when locked")
|
||||
|
||||
def test_no_upload_with_xattr(self):
|
||||
self.gen_files(lock=False, xattr=UPLOAD_ATTR_VALUE)
|
||||
|
||||
self.start_thread()
|
||||
# allow enough time that files could upload twice if there is a bug in the logic
|
||||
time.sleep(5)
|
||||
self.join_thread()
|
||||
|
||||
self.assertEqual(len(log_handler.upload_order), 0, "File uploaded again")
|
||||
|
||||
def test_clear_locks_on_startup(self):
|
||||
f_paths = self.gen_files(lock=True, boot=False)
|
||||
self.start_thread()
|
||||
time.sleep(1)
|
||||
self.join_thread()
|
||||
|
||||
for f_path in f_paths:
|
||||
lock_path = f_path.with_suffix(f_path.suffix + ".lock")
|
||||
self.assertFalse(lock_path.is_file(), "File lock not cleared on startup")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -9,10 +9,12 @@ import threading
|
||||
import time
|
||||
import traceback
|
||||
import datetime
|
||||
from typing import BinaryIO, Iterator, List, Optional, Tuple
|
||||
from typing import BinaryIO
|
||||
from collections.abc import Iterator
|
||||
|
||||
from cereal import log
|
||||
import cereal.messaging as messaging
|
||||
import openpilot.selfdrive.sentry as sentry
|
||||
from openpilot.common.api import Api
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.common.realtime import set_core_affinity
|
||||
@@ -42,10 +44,12 @@ class FakeResponse:
|
||||
self.request = FakeRequest()
|
||||
|
||||
|
||||
def get_directory_sort(d: str) -> List[str]:
|
||||
return [s.rjust(10, '0') for s in d.rsplit('--', 1)]
|
||||
def get_directory_sort(d: str) -> list[str]:
|
||||
# ensure old format is sorted sooner
|
||||
o = ["0", ] if d.startswith("2024-") else ["1", ]
|
||||
return o + [s.rjust(10, '0') for s in d.rsplit('--', 1)]
|
||||
|
||||
def listdir_by_creation(d: str) -> List[str]:
|
||||
def listdir_by_creation(d: str) -> list[str]:
|
||||
if not os.path.isdir(d):
|
||||
return []
|
||||
|
||||
@@ -82,7 +86,7 @@ class Uploader:
|
||||
self.immediate_folders = ["crash/", "boot/"]
|
||||
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
|
||||
|
||||
def list_upload_files(self, metered: bool) -> Iterator[Tuple[str, str, str]]:
|
||||
def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]:
|
||||
r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8")
|
||||
requested_routes = [] if r is None else r.split(",")
|
||||
|
||||
@@ -121,7 +125,7 @@ class Uploader:
|
||||
|
||||
yield name, key, fn
|
||||
|
||||
def next_file_to_upload(self, metered: bool) -> Optional[Tuple[str, str, str]]:
|
||||
def next_file_to_upload(self, metered: bool) -> tuple[str, str, str] | None:
|
||||
upload_files = list(self.list_upload_files(metered))
|
||||
|
||||
for name, key, fn in upload_files:
|
||||
@@ -207,7 +211,7 @@ class Uploader:
|
||||
return success
|
||||
|
||||
|
||||
def step(self, network_type: int, metered: bool) -> Optional[bool]:
|
||||
def step(self, network_type: int, metered: bool) -> bool | None:
|
||||
d = self.next_file_to_upload(metered)
|
||||
if d is None:
|
||||
return None
|
||||
@@ -221,7 +225,7 @@ class Uploader:
|
||||
return self.upload(name, key, fn, network_type, metered)
|
||||
|
||||
|
||||
def main(exit_event: Optional[threading.Event] = None) -> None:
|
||||
def main(exit_event: threading.Event = None) -> None:
|
||||
if exit_event is None:
|
||||
exit_event = threading.Event()
|
||||
|
||||
@@ -247,7 +251,9 @@ def main(exit_event: Optional[threading.Event] = None) -> None:
|
||||
sm.update(0)
|
||||
offroad = params.get_bool("IsOffroad")
|
||||
network_type = sm['deviceState'].networkType if not force_wifi else NetworkType.wifi
|
||||
if network_type == NetworkType.none:
|
||||
at_home = not params.get_bool("DisableOnroadUploads") or offroad and network_type in (NetworkType.ethernet, NetworkType.wifi)
|
||||
openpilot_crashed = os.path.isfile(os.path.join(sentry.CRASHES_DIR, 'error.txt'))
|
||||
if network_type == NetworkType.none or not at_home or openpilot_crashed:
|
||||
if allow_sleep:
|
||||
time.sleep(60 if offroad else 5)
|
||||
continue
|
||||
|
||||
112
system/loggerd/video_writer.cc
Normal file
112
system/loggerd/video_writer.cc
Normal file
@@ -0,0 +1,112 @@
|
||||
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
|
||||
#include <cassert>
|
||||
|
||||
#include "system/loggerd/video_writer.h"
|
||||
#include "common/swaglog.h"
|
||||
#include "common/util.h"
|
||||
|
||||
VideoWriter::VideoWriter(const char *path, const char *filename, bool remuxing, int width, int height, int fps, cereal::EncodeIndex::Type codec)
|
||||
: remuxing(remuxing) {
|
||||
vid_path = util::string_format("%s/%s", path, filename);
|
||||
lock_path = util::string_format("%s/%s.lock", path, filename);
|
||||
|
||||
int lock_fd = HANDLE_EINTR(open(lock_path.c_str(), O_RDWR | O_CREAT, 0664));
|
||||
assert(lock_fd >= 0);
|
||||
close(lock_fd);
|
||||
|
||||
LOGD("encoder_open %s remuxing:%d", this->vid_path.c_str(), this->remuxing);
|
||||
if (this->remuxing) {
|
||||
bool raw = (codec == cereal::EncodeIndex::Type::BIG_BOX_LOSSLESS);
|
||||
avformat_alloc_output_context2(&this->ofmt_ctx, NULL, raw ? "matroska" : NULL, this->vid_path.c_str());
|
||||
assert(this->ofmt_ctx);
|
||||
|
||||
// set codec correctly. needed?
|
||||
assert(codec != cereal::EncodeIndex::Type::FULL_H_E_V_C);
|
||||
const AVCodec *avcodec = avcodec_find_encoder(raw ? AV_CODEC_ID_FFVHUFF : AV_CODEC_ID_H264);
|
||||
assert(avcodec);
|
||||
|
||||
this->codec_ctx = avcodec_alloc_context3(avcodec);
|
||||
assert(this->codec_ctx);
|
||||
this->codec_ctx->width = width;
|
||||
this->codec_ctx->height = height;
|
||||
this->codec_ctx->pix_fmt = AV_PIX_FMT_YUV420P;
|
||||
this->codec_ctx->time_base = (AVRational){ 1, fps };
|
||||
|
||||
if (codec == cereal::EncodeIndex::Type::BIG_BOX_LOSSLESS) {
|
||||
// without this, there's just noise
|
||||
int err = avcodec_open2(this->codec_ctx, avcodec, NULL);
|
||||
assert(err >= 0);
|
||||
}
|
||||
|
||||
this->out_stream = avformat_new_stream(this->ofmt_ctx, raw ? avcodec : NULL);
|
||||
assert(this->out_stream);
|
||||
|
||||
int err = avio_open(&this->ofmt_ctx->pb, this->vid_path.c_str(), AVIO_FLAG_WRITE);
|
||||
assert(err >= 0);
|
||||
|
||||
} else {
|
||||
this->of = util::safe_fopen(this->vid_path.c_str(), "wb");
|
||||
assert(this->of);
|
||||
}
|
||||
}
|
||||
|
||||
void VideoWriter::write(uint8_t *data, int len, long long timestamp, bool codecconfig, bool keyframe) {
|
||||
if (of && data) {
|
||||
size_t written = util::safe_fwrite(data, 1, len, of);
|
||||
if (written != len) {
|
||||
LOGE("failed to write file.errno=%d", errno);
|
||||
}
|
||||
}
|
||||
|
||||
if (remuxing) {
|
||||
if (codecconfig) {
|
||||
if (len > 0) {
|
||||
codec_ctx->extradata = (uint8_t*)av_mallocz(len + AV_INPUT_BUFFER_PADDING_SIZE);
|
||||
codec_ctx->extradata_size = len;
|
||||
memcpy(codec_ctx->extradata, data, len);
|
||||
}
|
||||
int err = avcodec_parameters_from_context(out_stream->codecpar, codec_ctx);
|
||||
assert(err >= 0);
|
||||
err = avformat_write_header(ofmt_ctx, NULL);
|
||||
assert(err >= 0);
|
||||
} else {
|
||||
// input timestamps are in microseconds
|
||||
AVRational in_timebase = {1, 1000000};
|
||||
|
||||
AVPacket pkt;
|
||||
av_init_packet(&pkt);
|
||||
pkt.data = data;
|
||||
pkt.size = len;
|
||||
|
||||
enum AVRounding rnd = static_cast<enum AVRounding>(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
|
||||
pkt.pts = pkt.dts = av_rescale_q_rnd(timestamp, in_timebase, ofmt_ctx->streams[0]->time_base, rnd);
|
||||
pkt.duration = av_rescale_q(50*1000, in_timebase, ofmt_ctx->streams[0]->time_base);
|
||||
|
||||
if (keyframe) {
|
||||
pkt.flags |= AV_PKT_FLAG_KEY;
|
||||
}
|
||||
|
||||
// TODO: can use av_write_frame for non raw?
|
||||
int err = av_interleaved_write_frame(ofmt_ctx, &pkt);
|
||||
if (err < 0) { LOGW("ts encoder write issue len: %d ts: %lld", len, timestamp); }
|
||||
|
||||
av_packet_unref(&pkt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VideoWriter::~VideoWriter() {
|
||||
if (this->remuxing) {
|
||||
int err = av_write_trailer(this->ofmt_ctx);
|
||||
if (err != 0) LOGE("av_write_trailer failed %d", err);
|
||||
avcodec_free_context(&this->codec_ctx);
|
||||
err = avio_closep(&this->ofmt_ctx->pb);
|
||||
if (err != 0) LOGE("avio_closep failed %d", err);
|
||||
avformat_free_context(this->ofmt_ctx);
|
||||
} else {
|
||||
util::safe_fflush(this->of);
|
||||
fclose(this->of);
|
||||
this->of = nullptr;
|
||||
}
|
||||
unlink(this->lock_path.c_str());
|
||||
}
|
||||
25
system/loggerd/video_writer.h
Normal file
25
system/loggerd/video_writer.h
Normal file
@@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
extern "C" {
|
||||
#include <libavformat/avformat.h>
|
||||
#include <libavcodec/avcodec.h>
|
||||
}
|
||||
|
||||
#include "cereal/messaging/messaging.h"
|
||||
|
||||
class VideoWriter {
|
||||
public:
|
||||
VideoWriter(const char *path, const char *filename, bool remuxing, int width, int height, int fps, cereal::EncodeIndex::Type codec);
|
||||
void write(uint8_t *data, int len, long long timestamp, bool codecconfig, bool keyframe);
|
||||
~VideoWriter();
|
||||
private:
|
||||
std::string vid_path, lock_path;
|
||||
FILE *of = nullptr;
|
||||
|
||||
AVCodecContext *codec_ctx;
|
||||
AVFormatContext *ofmt_ctx;
|
||||
AVStream *out_stream;
|
||||
bool remuxing;
|
||||
};
|
||||
@@ -1,10 +1,9 @@
|
||||
import os
|
||||
import errno
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
_cached_attributes: Dict[Tuple, Optional[bytes]] = {}
|
||||
_cached_attributes: dict[tuple, bytes | None] = {}
|
||||
|
||||
def getxattr(path: str, attr_name: str) -> Optional[bytes]:
|
||||
def getxattr(path: str, attr_name: str) -> bytes | None:
|
||||
key = (path, attr_name)
|
||||
if key not in _cached_attributes:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user