Skip to content

Commit

Permalink
replay: bug fixes and improvements (commaai#32193)
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee authored Apr 22, 2024
1 parent 099e31a commit 2c409e0
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 299 deletions.
9 changes: 4 additions & 5 deletions tools/cabana/streams/replaystream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ void ReplayStream::mergeSegments() {

std::vector<const CanEvent *> new_events;
new_events.reserve(seg->log->events.size());
for (auto it = seg->log->events.cbegin(); it != seg->log->events.cend(); ++it) {
if ((*it)->which == cereal::Event::Which::CAN) {
const uint64_t ts = (*it)->mono_time;
capnp::FlatArrayMessageReader reader((*it)->data);
for (const Event &e : seg->log->events) {
if (e.which == cereal::Event::Which::CAN) {
capnp::FlatArrayMessageReader reader(e.data);
auto event = reader.getRoot<cereal::Event>();
for (const auto &c : event.getCan()) {
new_events.push_back(newEvent(ts, c));
new_events.push_back(newEvent(e.mono_time, c));
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions tools/cabana/videowidget.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,27 +257,27 @@ void Slider::setTimeRange(double min, double max) {
void Slider::parseQLog(int segnum, std::shared_ptr<LogReader> qlog) {
const auto &segments = qobject_cast<ReplayStream *>(can)->route()->segments();
if (segments.size() > 0 && segnum == segments.rbegin()->first && !qlog->events.empty()) {
emit updateMaximumTime(qlog->events.back()->mono_time / 1e9 - can->routeStartTime());
emit updateMaximumTime(qlog->events.back().mono_time / 1e9 - can->routeStartTime());
}

std::mutex mutex;
QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event *e) {
if (e->which == cereal::Event::Which::THUMBNAIL) {
capnp::FlatArrayMessageReader reader(e->data);
QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event &e) {
if (e.which == cereal::Event::Which::THUMBNAIL) {
capnp::FlatArrayMessageReader reader(e.data);
auto thumb = reader.getRoot<cereal::Event>().getThumbnail();
auto data = thumb.getThumbnail();
if (QPixmap pm; pm.loadFromData(data.begin(), data.size(), "jpeg")) {
QPixmap scaled = pm.scaledToHeight(MIN_VIDEO_HEIGHT - THUMBNAIL_MARGIN * 2, Qt::SmoothTransformation);
std::lock_guard lk(mutex);
thumbnails[thumb.getTimestampEof()] = scaled;
}
} else if (e->which == cereal::Event::Which::CONTROLS_STATE) {
capnp::FlatArrayMessageReader reader(e->data);
} else if (e.which == cereal::Event::Which::CONTROLS_STATE) {
capnp::FlatArrayMessageReader reader(e.data);
auto cs = reader.getRoot<cereal::Event>().getControlsState();
if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 &&
cs.getAlertSize() != cereal::ControlsState::AlertSize::NONE) {
std::lock_guard lk(mutex);
alerts.emplace(e->mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()});
alerts.emplace(e.mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()});
}
}
});
Expand Down
3 changes: 1 addition & 2 deletions tools/replay/consoleui.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void ConsoleUI::updateStatus() {
if (status != Status::Paused) {
auto events = replay->events();
uint64_t current_mono_time = replay->routeStartTime() + replay->currentSeconds() * 1e9;
bool playing = !events->empty() && events->back()->mono_time > current_mono_time;
bool playing = !events->empty() && events->back().mono_time > current_mono_time;
status = playing ? Status::Playing : Status::Waiting;
}
auto [status_str, status_color] = status_text[status];
Expand Down Expand Up @@ -368,7 +368,6 @@ void ConsoleUI::handleKey(char c) {
} else if (c == ' ') {
pauseReplay(!replay->isPaused());
} else if (c == 'q' || c == 'Q') {
replay->stop();
qApp->exit();
}
}
5 changes: 4 additions & 1 deletion tools/replay/filereader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ std::string FileReader::read(const std::string &file, std::atomic<bool> *abort)

std::string FileReader::download(const std::string &url, std::atomic<bool> *abort) {
for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) {
if (i > 0) rWarning("download failed, retrying %d", i);
if (i > 0) {
rWarning("download failed, retrying %d", i);
util::sleep_for(3000);
}

std::string result = httpGet(url, chunk_size_, abort);
if (!result.empty()) {
Expand Down
43 changes: 11 additions & 32 deletions tools/replay/logreader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@
#include "tools/replay/filereader.h"
#include "tools/replay/util.h"

LogReader::LogReader(size_t memory_pool_block_size) {
events.reserve(memory_pool_block_size);
}

LogReader::~LogReader() {
for (Event *e : events) {
delete e;
}
}

bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort);
if (raw_.empty()) return false;
Expand All @@ -22,34 +12,30 @@ bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool loca
raw_ = decompressBZ2(raw_, abort);
if (raw_.empty()) return false;
}
return parse(abort);
return load(raw_.data(), raw_.size(), abort);
}

bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abort) {
raw_.assign((const char *)data, size);
return parse(abort);
}

bool LogReader::parse(std::atomic<bool> *abort) {
bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
try {
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
events.reserve(65000);
kj::ArrayPtr<const capnp::word> words((const capnp::word *)data, size / sizeof(capnp::word));
while (words.size() > 0 && !(abort && *abort)) {
capnp::FlatArrayMessageReader reader(words);
auto event = reader.getRoot<cereal::Event>();
auto which = event.which();
uint64_t mono_time = event.getLogMonoTime();
auto event_data = kj::arrayPtr(words.begin(), reader.getEnd());

Event *evt = events.emplace_back(newEvent(which, mono_time, event_data));
const Event &evt = events.emplace_back(which, mono_time, event_data);
// Add encodeIdx packet again as a frame packet for the video stream
if (evt->which == cereal::Event::ROAD_ENCODE_IDX ||
evt->which == cereal::Event::DRIVER_ENCODE_IDX ||
evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
if (evt.which == cereal::Event::ROAD_ENCODE_IDX ||
evt.which == cereal::Event::DRIVER_ENCODE_IDX ||
evt.which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
if (uint64_t sof = idx.getTimestampSof()) {
mono_time = sof;
}
events.emplace_back(newEvent(which, mono_time, event_data, idx.getSegmentNum()));
events.emplace_back(which, mono_time, event_data, idx.getSegmentNum());
}

words = kj::arrayPtr(reader.getEnd(), words.end());
Expand All @@ -59,16 +45,9 @@ bool LogReader::parse(std::atomic<bool> *abort) {
}

if (!events.empty() && !(abort && *abort)) {
std::sort(events.begin(), events.end(), Event::lessThan());
events.shrink_to_fit();
std::sort(events.begin(), events.end());
return true;
}
return false;
}

Event *LogReader::newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &words, int eidx_segnum) {
#ifdef HAS_MEMORY_RESOURCE
return new (&mbr_) Event(which, mono_time, words, eidx_segnum);
#else
return new Event(which, mono_time, words, eidx_segnum);
#endif
}
32 changes: 4 additions & 28 deletions tools/replay/logreader.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
#pragma once

#if __has_include(<memory_resource>)
#define HAS_MEMORY_RESOURCE 1
#include <memory_resource>
#endif
#include <memory>
#include <string>
#include <vector>

Expand All @@ -13,27 +8,15 @@

const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
const int DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE = 65000;

class Event {
public:
Event(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &data, int eidx_segnum = -1)
: which(which), mono_time(mono_time), data(data), eidx_segnum(eidx_segnum) {}

struct lessThan {
inline bool operator()(const Event *l, const Event *r) {
return l->mono_time < r->mono_time || (l->mono_time == r->mono_time && l->which < r->which);
}
};

#if HAS_MEMORY_RESOURCE
void *operator new(size_t size, std::pmr::monotonic_buffer_resource *mbr) {
return mbr->allocate(size);
}
void operator delete(void *ptr) {
// No-op. memory used by EventMemoryPool increases monotonically until the logReader is destroyed.
bool operator<(const Event &other) const {
return mono_time < other.mono_time || (mono_time == other.mono_time && which < other.which);
}
#endif

uint64_t mono_time;
cereal::Event::Which which;
Expand All @@ -43,18 +26,11 @@ class Event {

class LogReader {
public:
LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
~LogReader();
bool load(const std::string &url, std::atomic<bool> *abort = nullptr,
bool local_cache = false, int chunk_size = -1, int retries = 0);
bool load(const std::byte *data, size_t size, std::atomic<bool> *abort = nullptr);
std::vector<Event*> events;
bool load(const char *data, size_t size, std::atomic<bool> *abort = nullptr);
std::vector<Event> events;

private:
Event *newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &words, int eidx_segnum = -1);
bool parse(std::atomic<bool> *abort);
std::string raw_;
#ifdef HAS_MEMORY_RESOURCE
std::pmr::monotonic_buffer_resource mbr_{DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE * sizeof(Event)};
#endif
};
Loading

0 comments on commit 2c409e0

Please sign in to comment.