From d3007d11cd38f10ee13791c350ac95824ccb902c Mon Sep 17 00:00:00 2001 From: Adam Wegrzynek Date: Fri, 10 Nov 2023 12:27:32 +0100 Subject: [PATCH] [OMON-733] Create bucket on SOR (#335) --- CMakeLists.txt | 2 +- examples/12-KafkaToInfluxDb.cxx | 34 +++++++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 357aeeb0..82a933bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,7 @@ endif() # Define project project(Monitoring - VERSION 3.17.5 + VERSION 3.17.6 DESCRIPTION "O2 Monitoring library" LANGUAGES CXX ) diff --git a/examples/12-KafkaToInfluxDb.cxx b/examples/12-KafkaToInfluxDb.cxx index 7b5c4dbf..218f4614 100644 --- a/examples/12-KafkaToInfluxDb.cxx +++ b/examples/12-KafkaToInfluxDb.cxx @@ -18,6 +18,21 @@ using namespace o2::monitoring; + +std::string getCreateBucketBody(const std::string& orgId, const int run) { + std::stringstream postPayload; + postPayload << R"({ + "orgID": ")" + orgId + R"(", + "name": ")" + std::to_string(run) + R"(", + "retentionRules": [{ + "type": "expire", + "everySeconds": 86400, + "shardGroupDurationSeconds": 86400 + }] + })"; + return postPayload.str(); +} + int main(int argc, char* argv[]) { boost::program_options::options_description desc("Program options"); @@ -26,7 +41,10 @@ int main(int argc, char* argv[]) ("influxdb-url", boost::program_options::value()->required(), "InfluxDB hostname") ("influxdb-token", boost::program_options::value()->required(), "InfluxDB token") ("influxdb-org", boost::program_options::value()->default_value("cern"), "InfluxDB organisation") - ("influxdb-bucket", boost::program_options::value()->default_value("aliecs"), "InfluxDB bucket"); + ("influxdb-bucket", boost::program_options::value()->default_value("aliecs"), "InfluxDB bucket") + ("influxdb-dpl-url", boost::program_options::value(), "InfluxDB DPL ID") + ("influxdb-dpl-orgid", boost::program_options::value(), "InfluxDB DPL organization ID") + ("influxdb-dpl-token", boost::program_options::value(), "InfluxDB DPL token"); boost::program_options::variables_map vm; boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); boost::program_options::notify(vm); @@ -41,6 +59,14 @@ int main(int argc, char* argv[]) ); httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as()); auto influxdbBackend = std::make_unique(std::move(httpTransport)); + + std::unique_ptr influxBucketApi; + if (vm.count("influxdb-dpl-orgid") && vm.count("influxdb-dpl-url") && vm.count("influxdb-dpl-token")) { + MonLogger::Get() << "Creating bucket HTTP API for " << vm["influxdb-dpl-url"].as() << MonLogger::End(); + influxBucketApi.reset(new transports::HTTP(vm["influxdb-dpl-url"].as() + "/api/v2/buckets")); + influxBucketApi->addHeader("Authorization: Token " + vm["influxdb-dpl-token"].as()); + } + for (;;) { auto changes = kafkaConsumer->pull(); if (!changes.empty()) { @@ -50,6 +76,7 @@ int main(int argc, char* argv[]) if (stateChange.envinfo().state().empty()) { continue; } + int run = stateChange.envinfo().runnumber(); auto metric = Metric{"run_times"}; if (change.first.find("leave") != std::string::npos) { metric.addValue(stateChange.timestamp(), "eor"); @@ -57,8 +84,11 @@ int main(int argc, char* argv[]) } else { metric.addValue(stateChange.envinfo().runtype(), "type").addValue(stateChange.envinfo().enterstatetimestamp(), "sor"); MonLogger::Get() << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " SOR: " < 1) { + MonLogger::Get() << "Request sent to create bucket " << stateChange.envinfo().runnumber() << " on " << vm["influxdb-dpl-url"].as() << MonLogger::End(); + influxBucketApi->send(getCreateBucketBody(vm["influxdb-dpl-orgid"].as(), stateChange.envinfo().runnumber())); + } } - int run = stateChange.envinfo().runnumber(); if (run > 1) { influxdbBackend->sendWithRun(metric, stateChange.envinfo().environmentid(), std::to_string(run)); }