Skip to content

Commit

Permalink
[OMON-733] Create bucket on SOR (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
awegrzyn authored Nov 10, 2023
1 parent d883244 commit d3007d1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ endif()

# Define project
project(Monitoring
VERSION 3.17.5
VERSION 3.17.6
DESCRIPTION "O2 Monitoring library"
LANGUAGES CXX
)
Expand Down
34 changes: 32 additions & 2 deletions examples/12-KafkaToInfluxDb.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@

using namespace o2::monitoring;


std::string getCreateBucketBody(const std::string& orgId, const int run) {
std::stringstream postPayload;
postPayload << R"({
"orgID": ")" + orgId + R"(",
"name": ")" + std::to_string(run) + R"(",
"retentionRules": [{
"type": "expire",
"everySeconds": 86400,
"shardGroupDurationSeconds": 86400
}]
})";
return postPayload.str();
}

int main(int argc, char* argv[])
{
boost::program_options::options_description desc("Program options");
Expand All @@ -26,7 +41,10 @@ int main(int argc, char* argv[])
("influxdb-url", boost::program_options::value<std::string>()->required(), "InfluxDB hostname")
("influxdb-token", boost::program_options::value<std::string>()->required(), "InfluxDB token")
("influxdb-org", boost::program_options::value<std::string>()->default_value("cern"), "InfluxDB organisation")
("influxdb-bucket", boost::program_options::value<std::string>()->default_value("aliecs"), "InfluxDB bucket");
("influxdb-bucket", boost::program_options::value<std::string>()->default_value("aliecs"), "InfluxDB bucket")
("influxdb-dpl-url", boost::program_options::value<std::string>(), "InfluxDB DPL ID")
("influxdb-dpl-orgid", boost::program_options::value<std::string>(), "InfluxDB DPL organization ID")
("influxdb-dpl-token", boost::program_options::value<std::string>(), "InfluxDB DPL token");
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);
Expand All @@ -41,6 +59,14 @@ int main(int argc, char* argv[])
);
httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as<std::string>());
auto influxdbBackend = std::make_unique<backends::InfluxDB>(std::move(httpTransport));

std::unique_ptr<transports::HTTP> influxBucketApi;
if (vm.count("influxdb-dpl-orgid") && vm.count("influxdb-dpl-url") && vm.count("influxdb-dpl-token")) {
MonLogger::Get() << "Creating bucket HTTP API for " << vm["influxdb-dpl-url"].as<std::string>() << MonLogger::End();
influxBucketApi.reset(new transports::HTTP(vm["influxdb-dpl-url"].as<std::string>() + "/api/v2/buckets"));
influxBucketApi->addHeader("Authorization: Token " + vm["influxdb-dpl-token"].as<std::string>());
}

for (;;) {
auto changes = kafkaConsumer->pull();
if (!changes.empty()) {
Expand All @@ -50,15 +76,19 @@ int main(int argc, char* argv[])
if (stateChange.envinfo().state().empty()) {
continue;
}
int run = stateChange.envinfo().runnumber();
auto metric = Metric{"run_times"};
if (change.first.find("leave") != std::string::npos) {
metric.addValue(stateChange.timestamp(), "eor");
MonLogger::Get() << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " EOR: " << stateChange.timestamp() << MonLogger::End();
} else {
metric.addValue(stateChange.envinfo().runtype(), "type").addValue(stateChange.envinfo().enterstatetimestamp(), "sor");
MonLogger::Get() << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " SOR: " <<stateChange.envinfo().enterstatetimestamp() << MonLogger::End();
if (vm.count("influxdb-dpl-orgid") && run > 1) {
MonLogger::Get() << "Request sent to create bucket " << stateChange.envinfo().runnumber() << " on " << vm["influxdb-dpl-url"].as<std::string>() << MonLogger::End();
influxBucketApi->send(getCreateBucketBody(vm["influxdb-dpl-orgid"].as<std::string>(), stateChange.envinfo().runnumber()));
}
}
int run = stateChange.envinfo().runnumber();
if (run > 1) {
influxdbBackend->sendWithRun(metric, stateChange.envinfo().environmentid(), std::to_string(run));
}
Expand Down

0 comments on commit d3007d1

Please sign in to comment.