Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v3.17.4 #331

Merged
merged 14 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ endif()

# Define project
project(Monitoring
VERSION 3.15.4
VERSION 3.17.4
DESCRIPTION "O2 Monitoring library"
LANGUAGES CXX
)
Expand Down Expand Up @@ -79,6 +79,7 @@ find_package(CURL MODULE)
find_package(RdKafka CONFIG)
find_package(InfoLogger CONFIG)
find_package(Protobuf)
find_package(gRPC CONFIG)

####################################
# Set OUTPUT vars
Expand Down Expand Up @@ -276,6 +277,7 @@ if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND)

set(PROTO_EXAMPLES
examples/12-KafkaToInfluxDb.cxx
examples/14-OrbitId.cxx
examples/8-KafkaToHttpServer.cxx
)
foreach (example ${PROTO_EXAMPLES})
Expand All @@ -286,12 +288,63 @@ if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND)
Monitoring
Boost::program_options
protobuf::libprotobuf
$<$<BOOL:${InfoLogger_FOUND}>:AliceO2::InfoLogger>
)
target_compile_definitions(${example_name} PRIVATE $<$<BOOL:${InfoLogger_FOUND}>:O2_MONITORING_WITH_INFOLOGGER>)
target_include_directories(${example_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
install(TARGETS ${example_name})
endforeach()
set_target_properties(8-KafkaToHttpServer PROPERTIES OUTPUT_NAME "o2-monitoring-env-webserver")
set_target_properties(12-KafkaToInfluxDb PROPERTIES OUTPUT_NAME "o2-monitoring-kafka-to-influxdb")
set_target_properties(14-OrbitId PROPERTIES OUTPUT_NAME "o2-monitoring-orbitid")
endif()

####################################
# gRPC
####################################
if(Protobuf_FOUND AND gRPC_FOUND)
set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/proto/odc.proto ${CMAKE_CURRENT_SOURCE_DIR}/proto/o2control.proto)
set(EXAMPLES examples/15-ODC.cxx examples/16-AliECS.cxx)
foreach(PROTO_FILE example IN ZIP_LISTS PROTO_FILES EXAMPLES)
get_filename_component(PROTO_OUTPUT_NAME ${PROTO_FILE} NAME_WE)
get_filename_component(PROTO_FILE_PREFIX ${PROTO_FILE} PATH)
set(PROTO_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.pb.cc)
set(GRPC_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.grpc.pb.cc)

add_custom_command(
OUTPUT "${PROTO_CPP_OUTPUT}"
COMMAND protobuf::protoc
ARGS --proto_path ${PROTO_FILE_PREFIX}
--cpp_out ${CMAKE_CURRENT_BINARY_DIR}
${PROTO_OUTPUT_NAME}.proto
DEPENDS ${PROTO_FILE}
COMMENT "Running protoc on ${PROTO_FILE}"
VERBATIM)

add_custom_command(
OUTPUT "${GRPC_CPP_OUTPUT}"
COMMAND protobuf::protoc
ARGS --proto_path ${PROTO_FILE_PREFIX}
--grpc_out=${CMAKE_CURRENT_BINARY_DIR}
--plugin=protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>
${PROTO_OUTPUT_NAME}.proto
DEPENDS ${PROTO_FILE}
COMMENT "Running protoc/gRPC on ${PROTO_FILE}"
VERBATIM)

get_filename_component(example_name ${example} NAME)
string(REGEX REPLACE ".cxx" "" example_name ${example_name})
add_executable(${example_name} ${example} ${PROTO_CPP_OUTPUT} ${GRPC_CPP_OUTPUT})
target_link_libraries(${example_name} PRIVATE
Monitoring
gRPC::grpc++
protobuf::libprotobuf
Boost::program_options
)
target_include_directories(${example_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
endforeach()
set_target_properties(15-ODC PROPERTIES OUTPUT_NAME "o2-monitoring-odc")
set_target_properties(16-AliECS PROPERTIES OUTPUT_NAME "o2-monitoring-aliecs-tasks")
endif()

####################################
Expand Down Expand Up @@ -344,6 +397,15 @@ if(RdKafka_FOUND)
install(TARGETS 11-KafkaToWebsocket)
endif()


if(RdKafka_FOUND AND Protobuf_FOUND)
install(TARGETS 14-OrbitId)
endif()

if(Protobuf_FOUND AND gRPC_FOUND)
install(TARGETS 15-ODC)
endif()

# Create version file
include(CMakePackageConfigHelpers)
write_basic_package_version_file("${CMAKE_CURRENT_BINARY_DIR}/cmake/MonitoringConfigVersion.cmake"
Expand Down
9 changes: 6 additions & 3 deletions examples/12-KafkaToInfluxDb.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/algorithm/string.hpp>

#include "envs.pb.h"
#include "../src/MonLogger.h"

using namespace o2::monitoring;

Expand All @@ -30,6 +31,7 @@ int main(int argc, char* argv[])
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);

MonLogger::mLoggerSeverity = Severity::Debug;
std::vector<std::string> topics = {"aliecs.env_leave_state.RUNNING", "aliecs.env_state.RUNNING"};
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>() + ":9092", topics, "aliecs-run-times");
auto httpTransport = std::make_unique<transports::HTTP>(
Expand All @@ -48,12 +50,13 @@ int main(int argc, char* argv[])
if (stateChange.envinfo().state().empty()) {
continue;
}
std::cout << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " SOR: " <<stateChange.envinfo().enterstatetimestamp() << " EOR: " << stateChange.timestamp() << std::endl;
auto metric = Metric{"run_times"};
if (change.first.find("leave") != std::string::npos) {
metric.addValue(stateChange.envinfo().enterstatetimestamp(), "sor").addValue(stateChange.timestamp(), "eor");
metric.addValue(stateChange.timestamp(), "eor");
MonLogger::Get() << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " EOR: " << stateChange.timestamp() << MonLogger::End();
} else {
metric.addValue(stateChange.envinfo().runtype(), "type");
metric.addValue(stateChange.envinfo().runtype(), "type").addValue(stateChange.envinfo().enterstatetimestamp(), "sor");
MonLogger::Get() << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " SOR: " <<stateChange.envinfo().enterstatetimestamp() << MonLogger::End();
}
int run = stateChange.envinfo().runnumber();
if (run > 1) {
Expand Down
133 changes: 133 additions & 0 deletions examples/14-OrbitId.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
///
/// \file 14-OrbitId.cxx
/// \author Adam Wegrzynek <[email protected]>
///

#include "../src/Transports/KafkaConsumer.h"
#include "../src/Transports/Unix.h"

#include <iostream>
#include <memory>
#include <thread>
#include <boost/program_options.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string.hpp>

#include "envs.pb.h"
#include "../src/MonLogger.h"

using namespace o2::monitoring;

std::map<std::string, unsigned int> detectorRunMap;

std::map<unsigned int, std::string> referenceOrbitIdMap;

std::string getValueFromMetric(const std::string& key, const std::string& metric) {
auto indexStart = metric.find(key + "=");
if (indexStart == std::string::npos) {
return {};
}
auto indexEnd = std::find_if(metric.begin() + indexStart, metric.end(), [](const char& s) { return s == ' ' or s == ','; });
return std::string(metric.begin() + indexStart + key.size() + 1, indexEnd);
}


int main(int argc, char* argv[])
{
boost::program_options::options_description desc("Program options");
desc.add_options()
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka broker hostname");
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);

MonLogger::mLoggerSeverity = Severity::Debug;
std::vector<std::string> topics = {"aliecs.env_list.RUNNING", "cru.link_status"};
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>() + ":9092", topics, "orbitid");
auto unixSocket = std::make_unique<transports::Unix>("/tmp/telegraf.sock");
for (;;) {
auto messages = kafkaConsumer->pull();
if (!messages.empty()) {
for (auto& message : messages) {
// handle active runs messages
if (message.first == "aliecs.env_list.RUNNING") {
aliceo2::envs::ActiveRunsList activeRuns;
activeRuns.ParseFromString(message.second);
detectorRunMap.clear();
for (int i = 0; i < activeRuns.activeruns_size(); i++) {
auto run = activeRuns.activeruns(i).runnumber();
for (int j = 0; j < activeRuns.activeruns(i).detectors_size(); j++) {
auto detector = activeRuns.activeruns(i).detectors(j);
auto runType = activeRuns.activeruns(i).runtype();
if (runType.find("calib") != std::string::npos || runType.find("CALIB") != std::string::npos) {
MonLogger::Get() << "Skipping calibration run " << run << MonLogger::End();
continue;
}
for (auto& c : detector) c = std::tolower(c);
detectorRunMap.insert({detector, run});
}
}
if (detectorRunMap.empty()) {
MonLogger::Get() << "No ongoing runs" << MonLogger::End();
referenceOrbitIdMap.clear();
}
for (const auto &p : detectorRunMap) {
MonLogger::Get() << p.first << " belongs to run " << p.second << MonLogger::End();
}
// if SOR
// handle link status messages
} else if (message.first == "cru.link_status") {
auto detector = getValueFromMetric("detector", message.second);
auto orbitId = getValueFromMetric("orbitSor", message.second);
auto status = getValueFromMetric("status", message.second);
if (detector.empty() or orbitId.empty()) {
continue;
}
// Temporary disregard MCH and MID
// TODO: remove
if (detector == "mch" || detector == "mid") {
continue;
}

// if detector is not running
auto detectorInRun = detectorRunMap.find(detector);
if (detectorInRun == detectorRunMap.end()) {
continue;
}
// if link is excluded
if (status != "1i") {
continue;
}
// Temporary disable 0s
// TODO: remove
if (orbitId == "0i") {
continue;
}

// Drop wrongly reported values during BAR read (0xFFFFFFFF)
if (orbitId == "4294967295i") {
continue;
}

std::string outputMetric = "orbitIdMismatch" + message.second.substr(message.second.find(","), message.second.find(" ") - message.second.find(",")) + ",run=" + std::to_string(detectorRunMap.at(detector));
auto referenceOrbit = referenceOrbitIdMap.find(detectorRunMap.at(detector));
if (referenceOrbit == referenceOrbitIdMap.end()) {
/// wait for trigger
if (orbitId == "0i") {
continue;
}
referenceOrbitIdMap.insert({detectorRunMap.at(detector), orbitId});
MonLogger::Get() << "Set reference orbitId for run " << detectorRunMap.at(detector) << ": " << orbitId << MonLogger::End();
unixSocket->send(outputMetric + " reference=" + orbitId);
}
auto referenceOrbitId = referenceOrbitIdMap.at(detectorRunMap.at(detector));
if (orbitId != referenceOrbitId) {
MonLogger::Get() << "Abnormal condition for " << detector << "; expected orbitID: " << referenceOrbitId << " but got: " << orbitId << MonLogger::End();
unixSocket->send(outputMetric + " mismatched=" + orbitId);
}
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
Loading
Loading