Skip to content

Commit

Permalink
Zabbix: multiple metrics support; connection issue resolved (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
awegrzyn authored Aug 25, 2017
1 parent 3af3d2d commit f974016
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 49 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ Metric can be sent by one of the following ways:
+ `send(T value, std::string name)`
+ `sendTagged(T value, std::string name, std::vector<Tag>&& tags)`
+ `sendTimed(T value, std::string name, std::chrono::time_point<std::chrono::system_clock>& timestamp)`
3. Sending multiple metrics (only InfluxDB and Zabbix are supported, other backends fallback into sending metrics one by one)
+ `void send(std::string name, std::vector<Metric>&& metrics)`

## Derived metrics
The module can calculate derived metrics. To do so, use `addDerivedMetric(std::string name, DerivedMetricMode mode)` with one of two available modes:
Expand Down Expand Up @@ -303,6 +305,17 @@ collector->send(10, "myMetric");
collector->send({10, "myMetric"});
```
### Sending multiple metrics at once - examples/8-Multiple.cxx
```cpp
// configure monitoring (only once per process), pass configuration path as parameter
Monitoring::Configure("file:///home/awegrzyn/hackathon/Monitoring/examples/config-default.ini");
// Send two metrics at once as a single measurement
Monitoring::Get().send("measurementName", {
{10, "myMetricInt"},
{10.10, "myMetricFloat"}
});
```

## System monitoring, server-side backends installation and configuration
This guide explains manual installation. For `ansible` deployment see [AliceO2Group/system-configuration](https://gitlab.cern.ch/AliceO2Group/system-configuration/tree/master/ansible) gitlab repo.
Expand Down
3 changes: 3 additions & 0 deletions include/Monitoring/Backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class Backend
/// Sends metric via backend
virtual void send(const Metric& metric) = 0;

/// Sends multiple metrics (if supported), otherwise falls back into sending single metrics
virtual void sendMultiple(std::string measurement, std::vector<Metric>&& metrics) = 0;

/// Sets a tag
virtual void addGlobalTag(std::string name, std::string value) = 0;
};
Expand Down
2 changes: 1 addition & 1 deletion include/Monitoring/Collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Collector
/// Sends a metric to all avaliabes backends
/// If metric has been added to DerivedMetric the derived metric is calculated (see addDerivedMetric method)
/// \param metric r-value to metric object
void send(Metric&& metric, std::size_t skipBackend = -1);
void send(Metric&& metric);

/// Sends multiple metrics to as a single measurement
/// If it's not supported by backend it fallbacks into sending multiple metrics
Expand Down
10 changes: 10 additions & 0 deletions src/Backends/ApMonBackend.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ void ApMonBackend::addGlobalTag(std::string name, std::string value)
entity += value;
}

void ApMonBackend::sendMultiple(std::string measurement, std::vector<Metric>&& metrics)
{
for (auto& m : metrics) {
std::string tempName = m.getName();
m.setName(measurement + "-" + m.getName());
send(m);
m.setName(tempName);
}
}

void ApMonBackend::send(const Metric& metric)
{
std::string name = metric.getName();
Expand Down
6 changes: 6 additions & 0 deletions src/Backends/ApMonBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class ApMonBackend final : public Backend
/// \param metric reference to metric object:
void send(const Metric& metric) override;

/// Sends multiple metric in single packet
/// Not supported by the backend therefore it falls back to sending metric one by one
/// \param name measurement name
/// \param metrics list of metrics
void sendMultiple(std::string measurement, std::vector<Metric>&& metrics) override;

/// Extends entity value
/// \param name tag name (unused)
/// \param value tag value that is concatenated to entity string
Expand Down
10 changes: 10 additions & 0 deletions src/Backends/Flume.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ std::string Flume::metricToJson(const Metric& metric)
return s;
}

void Flume::sendMultiple(std::string measurement, std::vector<Metric>&& metrics)
{
for (auto& m : metrics) {
std::string tempName = m.getName();
m.setName(measurement + "-" + m.getName());
send(m);
m.setName(tempName);
}
}

void Flume::send(const Metric& metric)
{
mTransport->send(metricToJson(metric));
Expand Down
9 changes: 8 additions & 1 deletion src/Backends/Flume.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Flume final : public Backend
/// Default destructor
~Flume() = default;

// Convert timestamp to unsigned long as required by Flume
/// Convert timestamp to unsigned long as required by Flume
/// \param chrono time_point timestamp
/// \return timestamp in nanoseconds
inline unsigned long convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp);
Expand All @@ -46,6 +46,13 @@ class Flume final : public Backend
/// \param metric reference to metric object
void send(const Metric& metric) override;

/// Sends multiple metric in single packet
/// Not supported by the backend therefore it falls back to sending metric one by one
/// TODO: changed required in Flume Source
/// \param name measurement name
/// \param metrics list of metrics
void sendMultiple(std::string measurement, std::vector<Metric>&& metrics) override;

/// Adds tag
/// \param name tag name
/// \param value tag value
Expand Down
6 changes: 3 additions & 3 deletions src/Backends/InfluxDB.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ void InfluxDB::escape(std::string& escaped)
boost::replace_all(escaped, " ", "\\ ");
}

void InfluxDB::sendMultiple(std::string name, std::vector<Metric>&& metrics)
void InfluxDB::sendMultiple(std::string measurement, std::vector<Metric>&& metrics)
{
escape(name);
escape(measurement);
std::stringstream convert;
convert << name << "," << tagSet << " ";
convert << measurement << "," << tagSet << " ";

for (const auto& metric : metrics) {
std::string value = boost::lexical_cast<std::string>(metric.getValue());
Expand Down
2 changes: 1 addition & 1 deletion src/Backends/InfluxDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class InfluxDB final : public Backend
/// Sends multiple values in single measurement
/// \param name measurement name
/// \param metrics list of metrics
void sendMultiple(std::string name, std::vector<Metric>&& metrics);
void sendMultiple(std::string measurement, std::vector<Metric>&& metrics);

/// Adds tag
/// \param name tag name
Expand Down
10 changes: 10 additions & 0 deletions src/Backends/InfoLoggerBackend.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ void InfoLoggerBackend::addGlobalTag(std::string name, std::string value)
tagString += name + "=" + value;
}

void InfoLoggerBackend::sendMultiple(std::string measurement, std::vector<Metric>&& metrics)
{
for (auto& m : metrics) {
std::string tempName = m.getName();
m.setName(measurement + "-" + m.getName());
send(m);
m.setName(tempName);
}
}

void InfoLoggerBackend::send(const Metric& metric)
{
std::string metricTags{};
Expand Down
8 changes: 7 additions & 1 deletion src/Backends/InfoLoggerBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ class InfoLoggerBackend final : public Backend
/// Default destructor
~InfoLoggerBackend() = default;

/// Sends metric to InfluxDB
/// Sends metric to InfoLogger library
/// \param metric reference to metric object
void send(const Metric& metric) override;

/// Sends multiple metric at once
/// Not supported by the backend therefore it falls back to sending metric one by one
/// \param name measurement name
/// \param metrics list of metrics
void sendMultiple(std::string measurement, std::vector<Metric>&& metrics) override;

/// Adds tag
/// \param name tag name
/// \param value tag value
Expand Down
50 changes: 33 additions & 17 deletions src/Backends/Zabbix.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

#include "Zabbix.h"
#include <boost/lexical_cast.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <string>
#include "../Transports/TCP.h"
#include "../Exceptions/MonitoringInternalException.h"
Expand All @@ -20,8 +19,8 @@ namespace Backends
{

Zabbix::Zabbix(const std::string &hostname, int port)
: socketHostname(hostname), socketPort(port)
{
transport = std::make_unique<Transports::TCP>(hostname, port);
MonLogger::Get() << "Zabbix/TCP backend initialized"
<< " ("<< hostname << ":" << port << ")" << MonLogger::End();
}
Expand All @@ -36,29 +35,46 @@ inline std::string Zabbix::convertTimestamp(const std::chrono::time_point<std::c
return converted;
}

std::string Zabbix::metricToZabbix(const Metric& metric)
void Zabbix::send(const Metric& metric)
{
// create JSON payload
boost::property_tree::ptree request, dataArray, data;
boost::property_tree::ptree dataArray, data;
data.put<std::string>("host", hostname);
data.put<std::string>("key", metric.getName());
data.put<std::string>("value", boost::lexical_cast<std::string>(metric.getValue()));
data.put<std::string>("clock", convertTimestamp(metric.getTimestamp()));

dataArray.push_back(std::make_pair("", data));
sendOverTcp(createMessage(dataArray));
}

void Zabbix::sendMultiple(std::string measurement, std::vector<Metric>&& metrics)
{
boost::property_tree::ptree dataArray;
for (auto& metric : metrics) {
boost::property_tree::ptree data;
data.put<std::string>("host", hostname);
data.put<std::string>("key", metric.getName());
data.put<std::string>("value", boost::lexical_cast<std::string>(metric.getValue()));
data.put<std::string>("clock", convertTimestamp(metric.getTimestamp()));
dataArray.push_back(std::make_pair("", data));
}
sendOverTcp(createMessage(dataArray));
}

std::string Zabbix::createMessage(const boost::property_tree::ptree& dataArray) {
boost::property_tree::ptree request;
request.put<std::string>("request", "sender data");
request.add_child("data", dataArray);

std::stringstream ss;
std::stringstream ss;
write_json(ss, request);

std::string noWhiteSpaces = ss.str();
noWhiteSpaces.erase(std::remove_if( noWhiteSpaces.begin(), noWhiteSpaces.end(),
[](char c){ return (c =='\r' || c =='\t' || c == ' ' || c == '\n');}), noWhiteSpaces.end() );
noWhiteSpaces.insert(18, " ");

// prepare Zabbix header
uint32_t length = noWhiteSpaces.length();
std::string message = ss.str();
message.erase(std::remove_if( message.begin(), message.end(),
[](char c){ return (c =='\r' || c =='\t' || c == ' ' || c == '\n');}), message.end() );
message.insert(18, " ");

uint32_t length = message.length();
std::vector<unsigned char> header = {
'Z', 'B', 'X', 'D', '\x01',
static_cast<unsigned char>(length & 0xFF),
Expand All @@ -67,13 +83,13 @@ std::string Zabbix::metricToZabbix(const Metric& metric)
static_cast<unsigned char>((length >> 24) & 0x000000FF),
'\x00','\x00','\x00','\x00'
};

return std::string(header.begin(), header.end()) + noWhiteSpaces;
return std::string(header.begin(), header.end()) + message;
}

void Zabbix::send(const Metric& metric) {
void Zabbix::sendOverTcp(std::string&& message) {
try {
transport->send(metricToZabbix(metric));
std::unique_ptr<Transports::TCP> transport = std::make_unique<Transports::TCP>(socketHostname, socketPort);
transport->send(std::move(message));
} catch (MonitoringInternalException&) {
}
}
Expand Down
19 changes: 16 additions & 3 deletions src/Backends/Zabbix.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "Monitoring/Backend.h"
#include "../Transports/TCP.h"
#include "../MonLogger.h"
#include <boost/property_tree/json_parser.hpp>
#include <chrono>
#include <string>

Expand Down Expand Up @@ -39,20 +40,32 @@ class Zabbix final : public Backend
/// \param metric reference to metric object
void send(const Metric& metric) override;

/// Sends multiple metric in single packet
/// \param name measurement name
/// \param metrics list of metrics
void sendMultiple(std::string measurement, std::vector<Metric>&& metrics) override;

/// Adds tag
/// \param name tag name
/// \param value tag value
void addGlobalTag(std::string name, std::string value) override;

private:
/// TCP transport
std::unique_ptr<Transports::TCP> transport;
/// Zabbix server hostname
std::string socketHostname;

/// Zabbix sever port
int socketPort;

/// Hostname as required by Zabbix protocol
std::string hostname;

/// Sends Zabbix formatted message over TCP
/// \param message Zabbix message
void sendOverTcp(std::string&& message);

/// Prepares Zabbix protocol message
std::string metricToZabbix(const Metric& metric);
std::string createMessage(const boost::property_tree::ptree& dataArray);

/// Converts timestamp into unix format
/// \param timestamp chrono system_clock timestamp
Expand Down
24 changes: 2 additions & 22 deletions src/Collector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -156,34 +156,14 @@ void Collector::addDerivedMetric(std::string name, DerivedMetricMode mode) {

void Collector::send(std::string measurement, std::vector<Metric>&& metrics)
{
// find InfluxDB index
size_t influxIndex = -1;
for (auto& b: mBackends) {
if (dynamic_cast<Backends::InfluxDB*>(b.get())) {
influxIndex = &b-&mBackends[0];
}
}
// send single metric to InfluxDB
dynamic_cast<Backends::InfluxDB*>(
mBackends[influxIndex].get())->sendMultiple(measurement, std::move(metrics)
);

// send multiple metric to all other backends (prepend metric name with measurement name)
for (auto& m : metrics) {
std::string tempName = m.getName();
m.setName(measurement + "-" + m.getName());
send(std::move(m), influxIndex);
m.setName(tempName);
b->sendMultiple(measurement, std::move(metrics));
}
}

void Collector::send(Metric&& metric, std::size_t skipBackend)
void Collector::send(Metric&& metric)
{
std::size_t index = 0;
for (auto& b: mBackends) {
if (index++ == skipBackend) {
continue;
}
b->send(metric);
}
if (mDerivedHandler->isRegistered(metric.getName())) {
Expand Down

0 comments on commit f974016

Please sign in to comment.