From f9740163eae4b741779a2705d2b3e42d3a8b1d1a Mon Sep 17 00:00:00 2001 From: Adam Wegrzynek Date: Fri, 25 Aug 2017 07:36:34 +0100 Subject: [PATCH] Zabbix: multiple metrics support; connection issue resolved (#16) --- README.md | 13 ++++++++ include/Monitoring/Backend.h | 3 ++ include/Monitoring/Collector.h | 2 +- src/Backends/ApMonBackend.cxx | 10 ++++++ src/Backends/ApMonBackend.h | 6 ++++ src/Backends/Flume.cxx | 10 ++++++ src/Backends/Flume.h | 9 +++++- src/Backends/InfluxDB.cxx | 6 ++-- src/Backends/InfluxDB.h | 2 +- src/Backends/InfoLoggerBackend.cxx | 10 ++++++ src/Backends/InfoLoggerBackend.h | 8 ++++- src/Backends/Zabbix.cxx | 50 ++++++++++++++++++++---------- src/Backends/Zabbix.h | 19 ++++++++++-- src/Collector.cxx | 24 ++------------ 14 files changed, 123 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index d9f28147c..ab9d47bdf 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,8 @@ Metric can be sent by one of the following ways: + `send(T value, std::string name)` + `sendTagged(T value, std::string name, std::vector&& tags)` + `sendTimed(T value, std::string name, std::chrono::time_point& timestamp)` +3. Sending multiple metrics (only InfluxDB and Zabbix are supported, other backends fallback into sending metrics one by one) + + `void send(std::string name, std::vector&& metrics)` ## Derived metrics The module can calculate derived metrics. To do so, use `addDerivedMetric(std::string name, DerivedMetricMode mode)` with one of two available modes: @@ -303,6 +305,17 @@ collector->send(10, "myMetric"); collector->send({10, "myMetric"}); ``` +### Sending multiple metrics at once - examples/8-Multiple.cxx +```cpp +// configure monitoring (only once per process), pass configuration path as parameter +Monitoring::Configure("file:///home/awegrzyn/hackathon/Monitoring/examples/config-default.ini"); + +// Send two metrics at once as a single measurement +Monitoring::Get().send("measurementName", { + {10, "myMetricInt"}, + {10.10, "myMetricFloat"} +}); +``` ## System monitoring, server-side backends installation and configuration This guide explains manual installation. For `ansible` deployment see [AliceO2Group/system-configuration](https://gitlab.cern.ch/AliceO2Group/system-configuration/tree/master/ansible) gitlab repo. diff --git a/include/Monitoring/Backend.h b/include/Monitoring/Backend.h index 966d1d105..f49467d10 100644 --- a/include/Monitoring/Backend.h +++ b/include/Monitoring/Backend.h @@ -32,6 +32,9 @@ class Backend /// Sends metric via backend virtual void send(const Metric& metric) = 0; + /// Sends multiple metrics (if supported), otherwise falls back into sending single metrics + virtual void sendMultiple(std::string measurement, std::vector&& metrics) = 0; + /// Sets a tag virtual void addGlobalTag(std::string name, std::string value) = 0; }; diff --git a/include/Monitoring/Collector.h b/include/Monitoring/Collector.h index cadb3f362..b8f2c5fb6 100644 --- a/include/Monitoring/Collector.h +++ b/include/Monitoring/Collector.h @@ -56,7 +56,7 @@ class Collector /// Sends a metric to all avaliabes backends /// If metric has been added to DerivedMetric the derived metric is calculated (see addDerivedMetric method) /// \param metric r-value to metric object - void send(Metric&& metric, std::size_t skipBackend = -1); + void send(Metric&& metric); /// Sends multiple metrics to as a single measurement /// If it's not supported by backend it fallbacks into sending multiple metrics diff --git a/src/Backends/ApMonBackend.cxx b/src/Backends/ApMonBackend.cxx index 7ec58da17..74014b992 100644 --- a/src/Backends/ApMonBackend.cxx +++ b/src/Backends/ApMonBackend.cxx @@ -42,6 +42,16 @@ void ApMonBackend::addGlobalTag(std::string name, std::string value) entity += value; } +void ApMonBackend::sendMultiple(std::string measurement, std::vector&& metrics) +{ + for (auto& m : metrics) { + std::string tempName = m.getName(); + m.setName(measurement + "-" + m.getName()); + send(m); + m.setName(tempName); + } +} + void ApMonBackend::send(const Metric& metric) { std::string name = metric.getName(); diff --git a/src/Backends/ApMonBackend.h b/src/Backends/ApMonBackend.h index effe75ec1..243e4f0b8 100644 --- a/src/Backends/ApMonBackend.h +++ b/src/Backends/ApMonBackend.h @@ -42,6 +42,12 @@ class ApMonBackend final : public Backend /// \param metric reference to metric object: void send(const Metric& metric) override; + /// Sends multiple metric in single packet + /// Not supported by the backend therefore it falls back to sending metric one by one + /// \param name measurement name + /// \param metrics list of metrics + void sendMultiple(std::string measurement, std::vector&& metrics) override; + /// Extends entity value /// \param name tag name (unused) /// \param value tag value that is concatenated to entity string diff --git a/src/Backends/Flume.cxx b/src/Backends/Flume.cxx index 8cff98a0b..3b9c0d76c 100644 --- a/src/Backends/Flume.cxx +++ b/src/Backends/Flume.cxx @@ -45,6 +45,16 @@ std::string Flume::metricToJson(const Metric& metric) return s; } +void Flume::sendMultiple(std::string measurement, std::vector&& metrics) +{ + for (auto& m : metrics) { + std::string tempName = m.getName(); + m.setName(measurement + "-" + m.getName()); + send(m); + m.setName(tempName); + } +} + void Flume::send(const Metric& metric) { mTransport->send(metricToJson(metric)); diff --git a/src/Backends/Flume.h b/src/Backends/Flume.h index 492537cce..5e0d4dd91 100644 --- a/src/Backends/Flume.h +++ b/src/Backends/Flume.h @@ -37,7 +37,7 @@ class Flume final : public Backend /// Default destructor ~Flume() = default; - // Convert timestamp to unsigned long as required by Flume + /// Convert timestamp to unsigned long as required by Flume /// \param chrono time_point timestamp /// \return timestamp in nanoseconds inline unsigned long convertTimestamp(const std::chrono::time_point& timestamp); @@ -46,6 +46,13 @@ class Flume final : public Backend /// \param metric reference to metric object void send(const Metric& metric) override; + /// Sends multiple metric in single packet + /// Not supported by the backend therefore it falls back to sending metric one by one + /// TODO: changed required in Flume Source + /// \param name measurement name + /// \param metrics list of metrics + void sendMultiple(std::string measurement, std::vector&& metrics) override; + /// Adds tag /// \param name tag name /// \param value tag value diff --git a/src/Backends/InfluxDB.cxx b/src/Backends/InfluxDB.cxx index cba44a673..f2d4fbcb1 100644 --- a/src/Backends/InfluxDB.cxx +++ b/src/Backends/InfluxDB.cxx @@ -49,11 +49,11 @@ void InfluxDB::escape(std::string& escaped) boost::replace_all(escaped, " ", "\\ "); } -void InfluxDB::sendMultiple(std::string name, std::vector&& metrics) +void InfluxDB::sendMultiple(std::string measurement, std::vector&& metrics) { - escape(name); + escape(measurement); std::stringstream convert; - convert << name << "," << tagSet << " "; + convert << measurement << "," << tagSet << " "; for (const auto& metric : metrics) { std::string value = boost::lexical_cast(metric.getValue()); diff --git a/src/Backends/InfluxDB.h b/src/Backends/InfluxDB.h index 7f536d4f6..f48afcc24 100644 --- a/src/Backends/InfluxDB.h +++ b/src/Backends/InfluxDB.h @@ -53,7 +53,7 @@ class InfluxDB final : public Backend /// Sends multiple values in single measurement /// \param name measurement name /// \param metrics list of metrics - void sendMultiple(std::string name, std::vector&& metrics); + void sendMultiple(std::string measurement, std::vector&& metrics); /// Adds tag /// \param name tag name diff --git a/src/Backends/InfoLoggerBackend.cxx b/src/Backends/InfoLoggerBackend.cxx index cbc86f2f8..348b48b78 100644 --- a/src/Backends/InfoLoggerBackend.cxx +++ b/src/Backends/InfoLoggerBackend.cxx @@ -37,6 +37,16 @@ void InfoLoggerBackend::addGlobalTag(std::string name, std::string value) tagString += name + "=" + value; } +void InfoLoggerBackend::sendMultiple(std::string measurement, std::vector&& metrics) +{ + for (auto& m : metrics) { + std::string tempName = m.getName(); + m.setName(measurement + "-" + m.getName()); + send(m); + m.setName(tempName); + } +} + void InfoLoggerBackend::send(const Metric& metric) { std::string metricTags{}; diff --git a/src/Backends/InfoLoggerBackend.h b/src/Backends/InfoLoggerBackend.h index e97897ec7..0547d17fa 100644 --- a/src/Backends/InfoLoggerBackend.h +++ b/src/Backends/InfoLoggerBackend.h @@ -31,10 +31,16 @@ class InfoLoggerBackend final : public Backend /// Default destructor ~InfoLoggerBackend() = default; - /// Sends metric to InfluxDB + /// Sends metric to InfoLogger library /// \param metric reference to metric object void send(const Metric& metric) override; + /// Sends multiple metric at once + /// Not supported by the backend therefore it falls back to sending metric one by one + /// \param name measurement name + /// \param metrics list of metrics + void sendMultiple(std::string measurement, std::vector&& metrics) override; + /// Adds tag /// \param name tag name /// \param value tag value diff --git a/src/Backends/Zabbix.cxx b/src/Backends/Zabbix.cxx index 3c681f91a..99dcdf2c6 100644 --- a/src/Backends/Zabbix.cxx +++ b/src/Backends/Zabbix.cxx @@ -5,7 +5,6 @@ #include "Zabbix.h" #include -#include #include #include "../Transports/TCP.h" #include "../Exceptions/MonitoringInternalException.h" @@ -20,8 +19,8 @@ namespace Backends { Zabbix::Zabbix(const std::string &hostname, int port) + : socketHostname(hostname), socketPort(port) { - transport = std::make_unique(hostname, port); MonLogger::Get() << "Zabbix/TCP backend initialized" << " ("<< hostname << ":" << port << ")" << MonLogger::End(); } @@ -36,29 +35,46 @@ inline std::string Zabbix::convertTimestamp(const std::chrono::time_point("host", hostname); data.put("key", metric.getName()); data.put("value", boost::lexical_cast(metric.getValue())); data.put("clock", convertTimestamp(metric.getTimestamp())); - dataArray.push_back(std::make_pair("", data)); + sendOverTcp(createMessage(dataArray)); +} + +void Zabbix::sendMultiple(std::string measurement, std::vector&& metrics) +{ + boost::property_tree::ptree dataArray; + for (auto& metric : metrics) { + boost::property_tree::ptree data; + data.put("host", hostname); + data.put("key", metric.getName()); + data.put("value", boost::lexical_cast(metric.getValue())); + data.put("clock", convertTimestamp(metric.getTimestamp())); + dataArray.push_back(std::make_pair("", data)); + } + sendOverTcp(createMessage(dataArray)); +} + +std::string Zabbix::createMessage(const boost::property_tree::ptree& dataArray) { + boost::property_tree::ptree request; request.put("request", "sender data"); request.add_child("data", dataArray); - std::stringstream ss; + std::stringstream ss; write_json(ss, request); - - std::string noWhiteSpaces = ss.str(); - noWhiteSpaces.erase(std::remove_if( noWhiteSpaces.begin(), noWhiteSpaces.end(), - [](char c){ return (c =='\r' || c =='\t' || c == ' ' || c == '\n');}), noWhiteSpaces.end() ); - noWhiteSpaces.insert(18, " "); - // prepare Zabbix header - uint32_t length = noWhiteSpaces.length(); + std::string message = ss.str(); + message.erase(std::remove_if( message.begin(), message.end(), + [](char c){ return (c =='\r' || c =='\t' || c == ' ' || c == '\n');}), message.end() ); + message.insert(18, " "); + + uint32_t length = message.length(); std::vector header = { 'Z', 'B', 'X', 'D', '\x01', static_cast(length & 0xFF), @@ -67,13 +83,13 @@ std::string Zabbix::metricToZabbix(const Metric& metric) static_cast((length >> 24) & 0x000000FF), '\x00','\x00','\x00','\x00' }; - - return std::string(header.begin(), header.end()) + noWhiteSpaces; + return std::string(header.begin(), header.end()) + message; } -void Zabbix::send(const Metric& metric) { +void Zabbix::sendOverTcp(std::string&& message) { try { - transport->send(metricToZabbix(metric)); + std::unique_ptr transport = std::make_unique(socketHostname, socketPort); + transport->send(std::move(message)); } catch (MonitoringInternalException&) { } } diff --git a/src/Backends/Zabbix.h b/src/Backends/Zabbix.h index b7392a2c1..1bf148d39 100644 --- a/src/Backends/Zabbix.h +++ b/src/Backends/Zabbix.h @@ -9,6 +9,7 @@ #include "Monitoring/Backend.h" #include "../Transports/TCP.h" #include "../MonLogger.h" +#include #include #include @@ -39,20 +40,32 @@ class Zabbix final : public Backend /// \param metric reference to metric object void send(const Metric& metric) override; + /// Sends multiple metric in single packet + /// \param name measurement name + /// \param metrics list of metrics + void sendMultiple(std::string measurement, std::vector&& metrics) override; + /// Adds tag /// \param name tag name /// \param value tag value void addGlobalTag(std::string name, std::string value) override; private: - /// TCP transport - std::unique_ptr transport; + /// Zabbix server hostname + std::string socketHostname; + + /// Zabbix sever port + int socketPort; /// Hostname as required by Zabbix protocol std::string hostname; + /// Sends Zabbix formatted message over TCP + /// \param message Zabbix message + void sendOverTcp(std::string&& message); + /// Prepares Zabbix protocol message - std::string metricToZabbix(const Metric& metric); + std::string createMessage(const boost::property_tree::ptree& dataArray); /// Converts timestamp into unix format /// \param timestamp chrono system_clock timestamp diff --git a/src/Collector.cxx b/src/Collector.cxx index 9fd631ee7..067e02f1c 100644 --- a/src/Collector.cxx +++ b/src/Collector.cxx @@ -156,34 +156,14 @@ void Collector::addDerivedMetric(std::string name, DerivedMetricMode mode) { void Collector::send(std::string measurement, std::vector&& metrics) { - // find InfluxDB index - size_t influxIndex = -1; for (auto& b: mBackends) { - if (dynamic_cast(b.get())) { - influxIndex = &b-&mBackends[0]; - } - } - // send single metric to InfluxDB - dynamic_cast( - mBackends[influxIndex].get())->sendMultiple(measurement, std::move(metrics) - ); - - // send multiple metric to all other backends (prepend metric name with measurement name) - for (auto& m : metrics) { - std::string tempName = m.getName(); - m.setName(measurement + "-" + m.getName()); - send(std::move(m), influxIndex); - m.setName(tempName); + b->sendMultiple(measurement, std::move(metrics)); } } -void Collector::send(Metric&& metric, std::size_t skipBackend) +void Collector::send(Metric&& metric) { - std::size_t index = 0; for (auto& b: mBackends) { - if (index++ == skipBackend) { - continue; - } b->send(metric); } if (mDerivedHandler->isRegistered(metric.getName())) {