diff --git a/CMakeLists.txt b/CMakeLists.txt index bc70096a..fb2a9225 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,7 @@ endif() # Define project project(Monitoring - VERSION 3.15.5 + VERSION 3.16.0 DESCRIPTION "O2 Monitoring library" LANGUAGES CXX ) @@ -276,6 +276,7 @@ if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND) set(PROTO_EXAMPLES examples/12-KafkaToInfluxDb.cxx + examples/14-OrbitId.cxx examples/8-KafkaToHttpServer.cxx ) foreach (example ${PROTO_EXAMPLES}) diff --git a/examples/14-OrbitId.cxx b/examples/14-OrbitId.cxx new file mode 100644 index 00000000..fc7e2883 --- /dev/null +++ b/examples/14-OrbitId.cxx @@ -0,0 +1,110 @@ +/// +/// \file 14-OrbitId.cxx +/// \author Adam Wegrzynek +/// + +#include "../src/Transports/KafkaConsumer.h" +#include "../src/Transports/Unix.h" + +#include +#include +#include +#include +#include +#include + +#include "envs.pb.h" + +using namespace o2::monitoring; + +std::map detectorRunMap; + +std::map referenceOrbitIdMap; + +std::string getValueFromMetric(const std::string& key, const std::string& metric) { + auto indexStart = metric.find(key + "="); + if (indexStart == std::string::npos) { + return {}; + } + auto indexEnd = std::find_if(metric.begin() + indexStart, metric.end(), [](const char& s) { return s == ' ' or s == ','; }); + return std::string(metric.begin() + indexStart + key.size() + 1, indexEnd); +} + + +int main(int argc, char* argv[]) +{ + boost::program_options::options_description desc("Program options"); + desc.add_options() + ("kafka-host", boost::program_options::value()->required(), "Kafka broker hostname"); + boost::program_options::variables_map vm; + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); + boost::program_options::notify(vm); + + std::vector topics = {"aliecs.env_list.RUNNING", "cru.link_status"}; + auto kafkaConsumer = std::make_unique(vm["kafka-host"].as() + ":9092", topics, "orbitid"); + auto unixSocket = std::make_unique("/tmp/telegraf.sock"); + for (;;) { + auto messages = kafkaConsumer->pull(); + if (!messages.empty()) { + for (auto& message : messages) { + // handle active runs messages + if (message.first == "aliecs.env_list.RUNNING") { + aliceo2::envs::ActiveRunsList activeRuns; + activeRuns.ParseFromString(message.second); + detectorRunMap.clear(); + for (int i = 0; i < activeRuns.activeruns_size(); i++) { + auto run = activeRuns.activeruns(i).runnumber(); + for (int j = 0; j < activeRuns.activeruns(i).detectors_size(); j++) { + auto detector = activeRuns.activeruns(i).detectors(j); + for (auto& c : detector) c = std::tolower(c); + detectorRunMap.insert({detector, run}); + } + } + if (detectorRunMap.empty()) { + std::cout << "No ongoing runs" << std::endl; + referenceOrbitIdMap.clear(); + } + for (const auto &p : detectorRunMap) { + std::cout << p.first << " belongs to run " << p.second << std::endl; + } + // if SOR + // handle link status messages + } else if (message.first == "cru.link_status") { + auto detector = getValueFromMetric("detector", message.second); + auto orbitId = getValueFromMetric("orbitSor", message.second); + auto status = getValueFromMetric("status", message.second); + if (detector.empty() or orbitId.empty()) { + continue; + } + // if detector is not running + auto detectorInRun = detectorRunMap.find(detector); + if (detectorInRun == detectorRunMap.end()) { + continue; + } + // if link is excluded + if (status != "1i") { + continue; + } + + std::string outputMetric = "orbitIdMismatch" + message.second.substr(message.second.find(","), message.second.find(" ") - message.second.find(",")) + ",run=" + std::to_string(detectorRunMap.at(detector)); + auto referenceOrbit = referenceOrbitIdMap.find(detectorRunMap.at(detector)); + if (referenceOrbit == referenceOrbitIdMap.end()) { + /// wait for trigger + if (orbitId == "0i") { + continue; + } + referenceOrbitIdMap.insert({detectorRunMap.at(detector), orbitId}); + std::cout << "Set reference orbitId for run " << detectorRunMap.at(detector) << ": " << orbitId << std::endl; + unixSocket->send(outputMetric + " reference=" + orbitId); + } + auto referenceOrbitId = referenceOrbitIdMap.at(detectorRunMap.at(detector)); + if (orbitId != referenceOrbitId) { + std::cout << "Abnormal condition for " << detector << "; expected orbitID: " << referenceOrbitId << " but got: " << orbitId << std::endl; + unixSocket->send(outputMetric + " mismatched=" + orbitId); + } + } + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +}