Skip to content

Commit

Permalink
[OMON-669] Detect orbitId mismatch (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
awegrzyn authored Jul 13, 2023
1 parent c37d937 commit 35a3f83
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ endif()

# Define project
project(Monitoring
VERSION 3.15.5
VERSION 3.16.0
DESCRIPTION "O2 Monitoring library"
LANGUAGES CXX
)
Expand Down Expand Up @@ -276,6 +276,7 @@ if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND)

set(PROTO_EXAMPLES
examples/12-KafkaToInfluxDb.cxx
examples/14-OrbitId.cxx
examples/8-KafkaToHttpServer.cxx
)
foreach (example ${PROTO_EXAMPLES})
Expand Down
110 changes: 110 additions & 0 deletions examples/14-OrbitId.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
///
/// \file 14-OrbitId.cxx
/// \author Adam Wegrzynek <[email protected]>
///

#include "../src/Transports/KafkaConsumer.h"
#include "../src/Transports/Unix.h"

#include <iostream>
#include <memory>
#include <thread>
#include <boost/program_options.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string.hpp>

#include "envs.pb.h"

using namespace o2::monitoring;

std::map<std::string, unsigned int> detectorRunMap;

std::map<unsigned int, std::string> referenceOrbitIdMap;

std::string getValueFromMetric(const std::string& key, const std::string& metric) {
auto indexStart = metric.find(key + "=");
if (indexStart == std::string::npos) {
return {};
}
auto indexEnd = std::find_if(metric.begin() + indexStart, metric.end(), [](const char& s) { return s == ' ' or s == ','; });
return std::string(metric.begin() + indexStart + key.size() + 1, indexEnd);
}


int main(int argc, char* argv[])
{
boost::program_options::options_description desc("Program options");
desc.add_options()
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka broker hostname");
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);

std::vector<std::string> topics = {"aliecs.env_list.RUNNING", "cru.link_status"};
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>() + ":9092", topics, "orbitid");
auto unixSocket = std::make_unique<transports::Unix>("/tmp/telegraf.sock");
for (;;) {
auto messages = kafkaConsumer->pull();
if (!messages.empty()) {
for (auto& message : messages) {
// handle active runs messages
if (message.first == "aliecs.env_list.RUNNING") {
aliceo2::envs::ActiveRunsList activeRuns;
activeRuns.ParseFromString(message.second);
detectorRunMap.clear();
for (int i = 0; i < activeRuns.activeruns_size(); i++) {
auto run = activeRuns.activeruns(i).runnumber();
for (int j = 0; j < activeRuns.activeruns(i).detectors_size(); j++) {
auto detector = activeRuns.activeruns(i).detectors(j);
for (auto& c : detector) c = std::tolower(c);
detectorRunMap.insert({detector, run});
}
}
if (detectorRunMap.empty()) {
std::cout << "No ongoing runs" << std::endl;
referenceOrbitIdMap.clear();
}
for (const auto &p : detectorRunMap) {
std::cout << p.first << " belongs to run " << p.second << std::endl;
}
// if SOR
// handle link status messages
} else if (message.first == "cru.link_status") {
auto detector = getValueFromMetric("detector", message.second);
auto orbitId = getValueFromMetric("orbitSor", message.second);
auto status = getValueFromMetric("status", message.second);
if (detector.empty() or orbitId.empty()) {
continue;
}
// if detector is not running
auto detectorInRun = detectorRunMap.find(detector);
if (detectorInRun == detectorRunMap.end()) {
continue;
}
// if link is excluded
if (status != "1i") {
continue;
}

std::string outputMetric = "orbitIdMismatch" + message.second.substr(message.second.find(","), message.second.find(" ") - message.second.find(",")) + ",run=" + std::to_string(detectorRunMap.at(detector));
auto referenceOrbit = referenceOrbitIdMap.find(detectorRunMap.at(detector));
if (referenceOrbit == referenceOrbitIdMap.end()) {
/// wait for trigger
if (orbitId == "0i") {
continue;
}
referenceOrbitIdMap.insert({detectorRunMap.at(detector), orbitId});
std::cout << "Set reference orbitId for run " << detectorRunMap.at(detector) << ": " << orbitId << std::endl;
unixSocket->send(outputMetric + " reference=" + orbitId);
}
auto referenceOrbitId = referenceOrbitIdMap.at(detectorRunMap.at(detector));
if (orbitId != referenceOrbitId) {
std::cout << "Abnormal condition for " << detector << "; expected orbitID: " << referenceOrbitId << " but got: " << orbitId << std::endl;
unixSocket->send(outputMetric + " mismatched=" + orbitId);
}
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}

0 comments on commit 35a3f83

Please sign in to comment.