diff --git a/can/SConscript b/can/SConscript index f07234c111..41d1cdb924 100644 --- a/can/SConscript +++ b/can/SConscript @@ -1,4 +1,4 @@ -Import('env', 'envCython', 'cereal', 'common') +Import('env', 'envCython', 'common') import os @@ -6,7 +6,7 @@ envDBC = env.Clone() dbc_file_path = '-DDBC_FILE_PATH=\'"%s"\'' % (envDBC.Dir("..").abspath) envDBC['CXXFLAGS'] += [dbc_file_path] src = ["dbc.cc", "parser.cc", "packer.cc", "common.cc"] -libs = [common, "capnp", "kj", "zmq"] +libs = [common, "zmq"] # shared library for openpilot libdbc = envDBC.SharedLibrary('libdbc', src, LIBS=libs) diff --git a/can/common.h b/can/common.h index 03b99e5598..5371ad0cf8 100644 --- a/can/common.h +++ b/can/common.h @@ -6,13 +6,6 @@ #include #include -#include -#include - -#ifndef DYNAMIC_CAPNP -#include "cereal/gen/cpp/log.capnp.h" -#endif - #include "opendbc/can/common_dbc.h" #define INFO printf @@ -58,11 +51,16 @@ class MessageState { bool update_counter_generic(int64_t v, int cnt_size); }; +struct CanFrame { + uint64_t ts; + long src; + long address; + std::vector dat; +}; + class CANParser { private: const int bus; - kj::Array aligned_buf; - const DBC *dbc = NULL; std::unordered_map message_states; @@ -70,7 +68,6 @@ class CANParser { bool can_valid = false; bool bus_timeout = false; uint64_t first_sec = 0; - uint64_t last_sec = 0; uint64_t last_nonempty_sec = 0; uint64_t bus_timeout_threshold = 0; uint64_t can_invalid_cnt = CAN_INVALID_CNT; @@ -78,14 +75,10 @@ class CANParser { CANParser(int abus, const std::string& dbc_name, const std::vector> &messages); CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter); - #ifndef DYNAMIC_CAPNP - void update_string(const std::string &data, bool sendcan); - void update_strings(const std::vector &data, std::vector &vals, bool sendcan); - void UpdateCans(uint64_t sec, const capnp::List::Reader& cans); - #endif - void UpdateCans(uint64_t sec, const capnp::DynamicStruct::Reader& cans); + void update_frames(uint64_t frame_fist_sec, const std::vector &frames, std::vector &vals); + bool updateFrame(const CanFrame &data); void UpdateValid(uint64_t sec); - void query_latest(std::vector &vals, uint64_t last_ts = 0); + void query_latest(std::vector &vals, uint64_t frame_fist_sec = 0); }; class CANPacker { diff --git a/can/common.pxd b/can/common.pxd index 477f71b2d3..45c780850e 100644 --- a/can/common.pxd +++ b/can/common.pxd @@ -64,11 +64,17 @@ cdef extern from "common_dbc.h": cdef extern from "common.h": cdef const DBC* dbc_lookup(const string) + cdef struct CanFrame: + uint64_t ts + long src + long address + vector[uint8_t] dat + cdef cppclass CANParser: bool can_valid bool bus_timeout CANParser(int, string, vector[pair[uint32_t, int]]) except + - void update_strings(vector[string]&, vector[SignalValue]&, bool) except + + void update_frames(uint64_t sec, vector[CanFrame]&, vector[SignalValue]&) except + cdef cppclass CANPacker: CANPacker(string) diff --git a/can/parser.cc b/can/parser.cc index bf2406ec3b..20ba84285c 100644 --- a/can/parser.cc +++ b/can/parser.cc @@ -1,14 +1,8 @@ #include #include -#include #include -#include #include - -#include -#include -#include -#include +#include #include "cereal/logger/logger.h" #include "opendbc/can/common.h" @@ -91,8 +85,7 @@ bool MessageState::update_counter_generic(int64_t v, int cnt_size) { } -CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector> &messages) - : bus(abus), aligned_buf(kj::heapArray(1024)) { +CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector> &messages) : bus(abus) { dbc = dbc_lookup(dbc_name); assert(dbc); init_crc_lookup_tables(); @@ -101,7 +94,7 @@ CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector(buf_size); - } - memcpy(aligned_buf.begin(), data.data(), data.length()); - - // extract the messages - capnp::FlatArrayMessageReader cmsg(aligned_buf.slice(0, buf_size)); - cereal::Event::Reader event = cmsg.getRoot(); - +void CANParser::update_frames(uint64_t frame_first_sec, const std::vector &frames, std::vector &vals) { if (first_sec == 0) { - first_sec = event.getLogMonoTime(); + first_sec = frame_first_sec; } - last_sec = event.getLogMonoTime(); - - auto cans = sendcan ? event.getSendcan() : event.getCan(); - UpdateCans(last_sec, cans); - - UpdateValid(last_sec); -} - -void CANParser::update_strings(const std::vector &data, std::vector &vals, bool sendcan) { - uint64_t current_sec = 0; - for (const auto &d : data) { - update_string(d, sendcan); - if (current_sec == 0) { - current_sec = last_sec; + uint64_t last_sec = frame_first_sec; + for (const auto &f : frames) { + if (updateFrame(f)) { + last_sec = std::max(last_sec, f.ts); + last_nonempty_sec = f.ts; } } - query_latest(vals, current_sec); -} - -void CANParser::UpdateCans(uint64_t sec, const capnp::List::Reader& cans) { - //DEBUG("got %d messages\n", cans.size()); - - bool bus_empty = true; - - // parse the messages - for (const auto cmsg : cans) { - if (cmsg.getSrc() != bus) { - // DEBUG("skip %d: wrong bus\n", cmsg.getAddress()); - continue; - } - bus_empty = false; - auto state_it = message_states.find(cmsg.getAddress()); - if (state_it == message_states.end()) { - // DEBUG("skip %d: not specified\n", cmsg.getAddress()); - continue; - } - - auto dat = cmsg.getDat(); - - if (dat.size() > 64) { - DEBUG("got message longer than 64 bytes: 0x%X %zu\n", cmsg.getAddress(), dat.size()); - continue; - } - - // TODO: this actually triggers for some cars. fix and enable this - //if (dat.size() != state_it->second.size) { - // DEBUG("got message with unexpected length: expected %d, got %zu for %d", state_it->second.size, dat.size(), cmsg.getAddress()); - // continue; - //} - - std::vector data(dat.size(), 0); - memcpy(data.data(), dat.begin(), dat.size()); - state_it->second.parse(sec, data); - } - - // update bus timeout - if (!bus_empty) { - last_nonempty_sec = sec; - } - bus_timeout = (sec - last_nonempty_sec) > bus_timeout_threshold; + bus_timeout = (last_sec - last_nonempty_sec) > bus_timeout_threshold; + UpdateValid(last_sec); + query_latest(vals, frame_first_sec); } -#endif - -void CANParser::UpdateCans(uint64_t sec, const capnp::DynamicStruct::Reader& cmsg) { - // assume message struct is `cereal::CanData` and parse - assert(cmsg.has("address") && cmsg.has("src") && cmsg.has("dat") && cmsg.has("busTime")); - if (cmsg.get("src").as() != bus) { - DEBUG("skip %d: wrong bus\n", cmsg.get("address").as()); - return; +bool CANParser::updateFrame(const CanFrame &frame) { + // DEBUG("got %d messages\n", cans.size()); + if (frame.src != bus) { + // DEBUG("skip %d: wrong bus\n", cmsg.getAddress()); + return false; } - auto state_it = message_states.find(cmsg.get("address").as()); + auto state_it = message_states.find(frame.address); if (state_it == message_states.end()) { - DEBUG("skip %d: not specified\n", cmsg.get("address").as()); - return; + // DEBUG("skip %d: not specified\n", cmsg.getAddress()); + return false; } - auto dat = cmsg.get("dat").as(); - if (dat.size() > 64) return; // shouldn't ever happen - std::vector data(dat.size(), 0); - memcpy(data.data(), dat.begin(), dat.size()); - state_it->second.parse(sec, data); + if (frame.dat.size() > 64) { + DEBUG("got message longer than 64 bytes: 0x%X %zu\n", frame.address, frame.dat.size()); + return false; + } + + // TODO: this actually triggers for some cars. fix and enable this + // if (dat.size() != state_it->second.size) { + // DEBUG("got message with unexpected length: expected %d, got %zu for %d", state_it->second.size, dat.size(), cmsg.getAddress()); + // continue; + //} + return state_it->second.parse(frame.ts, frame.dat); } void CANParser::UpdateValid(uint64_t sec) { - const bool show_missing = (last_sec - first_sec) > 8e9; - + const bool show_missing = (sec - first_sec) > 8e9; bool _valid = true; bool _counters_valid = true; for (const auto& kv : message_states) { @@ -300,13 +231,10 @@ void CANParser::UpdateValid(uint64_t sec) { can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid; } -void CANParser::query_latest(std::vector &vals, uint64_t last_ts) { - if (last_ts == 0) { - last_ts = last_sec; - } +void CANParser::query_latest(std::vector &vals, uint64_t frame_fist_sec) { for (auto& kv : message_states) { auto& state = kv.second; - if (last_ts != 0 && state.last_seen_nanos < last_ts) { + if (frame_fist_sec != 0 && state.last_seen_nanos < frame_fist_sec) { continue; } diff --git a/can/parser_pyx.pyx b/can/parser_pyx.pyx index 81d3e06ff4..c9bc1fb4ac 100644 --- a/can/parser_pyx.pyx +++ b/can/parser_pyx.pyx @@ -6,15 +6,16 @@ from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.vector cimport vector from libcpp.unordered_set cimport unordered_set -from libc.stdint cimport uint32_t +from libc.stdint cimport uint32_t, uint64_t from .common cimport CANParser as cpp_CANParser -from .common cimport dbc_lookup, SignalValue, DBC +from .common cimport dbc_lookup, SignalValue, DBC, CanFrame import numbers +import capnp +from cereal import log from collections import defaultdict - cdef class CANParser: cdef: cpp_CANParser *can @@ -70,12 +71,33 @@ cdef class CANParser: for l in v.values(): # no-cython-lint l.clear() + cdef vector[CanFrame] can_frames + cdef CanFrame* frame + cdef uint64_t frame_fist_sec = 0 + try: + for s in strings: + with log.Event.from_bytes(s) as msg: + if frame_fist_sec == 0: + frame_fist_sec = msg.logMonoTime + else: + frame_fist_sec = min(frame_fist_sec, msg.logMonoTime) + + can = msg.sendCan if sendcan else msg.can + for c in can: + frame = &(can_frames.emplace_back()) + frame.ts = msg.logMonoTime + frame.src = c.src + frame.address = c.address + frame.dat = c.dat + except capnp.lib.capnp.KjException as ex: + raise RuntimeError(str(ex)) + cdef vector[SignalValue] new_vals - cdef unordered_set[uint32_t] updated_addrs + self.can.update_frames(frame_fist_sec, can_frames, new_vals) - self.can.update_strings(strings, new_vals, sendcan) - cdef vector[SignalValue].iterator it = new_vals.begin() + cdef unordered_set[uint32_t] updated_addrs cdef SignalValue* cv + cdef vector[SignalValue].iterator it = new_vals.begin() while it != new_vals.end(): cv = &deref(it) # Cast char * directly to unicode