Skip to content

Commit

Permalink
Sending multiple metrics (values) in a single measurement (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
awegrzyn authored Aug 3, 2017
1 parent 2ad7a98 commit 3af3d2d
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 17 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ set(EXAMPLES
examples/5-Benchmark.cxx
examples/6-DedicatedInstance.cxx
examples/7-Latency.cxx
examples/8-Multiple.cxx
)

foreach (example ${EXAMPLES})
Expand Down
22 changes: 17 additions & 5 deletions examples/5-Benchmark.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ int main(int argc, char *argv[]) {
("config", boost::program_options::value<std::string>()->required(), "Config file path")
("id", boost::program_options::value<std::string>(), "Instance ID")
("count", boost::program_options::value<int>(), "Number of metric bunches (x3)")
("multiple", boost::program_options::bool_switch()->default_value(false), "Sends multiple metrics per measurement")
;

boost::program_options::variables_map vm;
Expand Down Expand Up @@ -52,10 +53,21 @@ int main(int argc, char *argv[]) {
add = 1;
}

for (int i = 0; i <= count; i += add) {
Monitoring::Get().send({"string" + std::to_string(intDist(mt)), "stringMetric"});
Monitoring::Get().send({doubleDist(mt), "doubleMetric"});
Monitoring::Get().send({intDist(mt), "intMetric"});
std::this_thread::sleep_for(std::chrono::microseconds(sleep));
if (!vm["multiple"].as<bool>()) {
for (int i = 0; i <= count; i += add) {
Monitoring::Get().send({"string" + std::to_string(intDist(mt)), "stringMetric"});
Monitoring::Get().send({doubleDist(mt), "doubleMetric"});
Monitoring::Get().send({intDist(mt), "intMetric"});
std::this_thread::sleep_for(std::chrono::microseconds(sleep));
}
} else {
for (int i = 0; i <= count; i += add) {
Monitoring::Get().send("benchmarkMeasurement",{
{"string" + std::to_string(intDist(mt)), "stringMetric"},
{doubleDist(mt), "doubleMetric"},
{intDist(mt), "intMetric"}
});
std::this_thread::sleep_for(std::chrono::microseconds(sleep));
}
}
}
17 changes: 17 additions & 0 deletions examples/8-Multiple.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
///
/// \file 8-Multiple.cxx
/// \author Adam Wegrzynek <[email protected]>
///

#include "ExampleBoilerplate.cxx"
#include "Monitoring/MonitoringFactory.h"

using Monitoring = AliceO2::Monitoring::MonitoringFactory;

int main(int argc, char *argv[]) {

// configure monitoring (once per process), pass configuration path as parameter
Monitoring::Configure("file://" + GetConfigFromCmdLine(argc, argv));

Monitoring::Get().send("measurementName", {{10, "myMetricInt"}, {10.10, "myMetricFloat"}});
}
8 changes: 7 additions & 1 deletion include/Monitoring/Collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ class Collector
/// Sends a metric to all avaliabes backends
/// If metric has been added to DerivedMetric the derived metric is calculated (see addDerivedMetric method)
/// \param metric r-value to metric object
void send(Metric&& metric);
void send(Metric&& metric, std::size_t skipBackend = -1);

/// Sends multiple metrics to as a single measurement
/// If it's not supported by backend it fallbacks into sending multiple metrics
/// \param name measurement name
/// \param metrics list of metrics
void send(std::string name, std::vector<Metric>&& metrics);

/// Sends a metric with tagset to all avaliabes backends
/// If metric has been added to DerivedMetric the derived metric is calculated (see addDerivedMetric method)
Expand Down
4 changes: 4 additions & 0 deletions include/Monitoring/Metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class Metric
/// \return metric name
std::string getName() const;

/// Name setter
/// \param new name of the metric
void setName(std::string name);

/// Timestamp getter
/// \return metric timestamp
std::chrono::time_point<std::chrono::system_clock> getTimestamp() const;
Expand Down
43 changes: 34 additions & 9 deletions src/Backends/InfluxDB.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ void InfluxDB::escape(std::string& escaped)
boost::replace_all(escaped, " ", "\\ ");
}

void InfluxDB::sendMultiple(std::string name, std::vector<Metric>&& metrics)
{
escape(name);
std::stringstream convert;
convert << name << "," << tagSet << " ";

for (const auto& metric : metrics) {
std::string value = boost::lexical_cast<std::string>(metric.getValue());
prepareValue(value, metric.getType());
convert << metric.getName() << "=" << value << ",";
}
convert.seekp(-1, std::ios_base::end);
convert << " " << convertTimestamp(metrics.back().getTimestamp());

try {
transport->send(convert.str());
} catch (MonitoringInternalException&) {
}
}

void InfluxDB::send(const Metric& metric)
{
std::string metricTags{};
Expand All @@ -57,15 +77,7 @@ void InfluxDB::send(const Metric& metric)
}

std::string value = boost::lexical_cast<std::string>(metric.getValue());
if (metric.getType() == MetricType::STRING) {
escape(value);
value.insert(value.begin(), '"');
value.insert(value.end(), '"');
}

if (metric.getType() == MetricType::INT) {
value.insert(value.end(), 'i');
}
prepareValue(value, metric.getType());
std::string name = metric.getName();
escape(name);

Expand All @@ -78,6 +90,19 @@ void InfluxDB::send(const Metric& metric)
}
}

void InfluxDB::prepareValue(std::string& value, int type)
{
if (type == MetricType::STRING) {
escape(value);
value.insert(value.begin(), '"');
value.insert(value.end(), '"');
}

if (type == MetricType::INT) {
value.insert(value.end(), 'i');
}
}

void InfluxDB::addGlobalTag(std::string name, std::string value)
{
escape(name); escape(value);
Expand Down
10 changes: 10 additions & 0 deletions src/Backends/InfluxDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class InfluxDB final : public Backend
/// \param metric reference to metric object
void send(const Metric& metric) override;

/// Sends multiple values in single measurement
/// \param name measurement name
/// \param metrics list of metrics
void sendMultiple(std::string name, std::vector<Metric>&& metrics);

/// Adds tag
/// \param name tag name
/// \param value tag value
Expand All @@ -62,6 +67,11 @@ class InfluxDB final : public Backend
/// Escapes " ", "," and "=" characters
/// \param escaped string rerference to escape characters from
void escape(std::string& escaped);

/// Modifies values to Influx Line Protocol format
/// \param value reference to value
/// \param type type of the metric
void prepareValue(std::string& value, int type);
};

} // namespace Backends
Expand Down
31 changes: 29 additions & 2 deletions src/Collector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Collector::Collector(const std::string& configPath)
MonLogger::Get() << "InfluxDB/HTTP backend disabled" << MonLogger::End();
}
#endif

if (configFile->get<int>("Flume/enable").value_or(0) == 1) {
mBackends.emplace_back(std::make_unique<Backends::Flume>(
configFile->get<std::string>("Flume/hostname").value(),
Expand Down Expand Up @@ -154,9 +154,36 @@ void Collector::addDerivedMetric(std::string name, DerivedMetricMode mode) {
mDerivedHandler->registerMetric(name, mode);
}

void Collector::send(Metric&& metric)
void Collector::send(std::string measurement, std::vector<Metric>&& metrics)
{
// find InfluxDB index
size_t influxIndex = -1;
for (auto& b: mBackends) {
if (dynamic_cast<Backends::InfluxDB*>(b.get())) {
influxIndex = &b-&mBackends[0];
}
}
// send single metric to InfluxDB
dynamic_cast<Backends::InfluxDB*>(
mBackends[influxIndex].get())->sendMultiple(measurement, std::move(metrics)
);

// send multiple metric to all other backends (prepend metric name with measurement name)
for (auto& m : metrics) {
std::string tempName = m.getName();
m.setName(measurement + "-" + m.getName());
send(std::move(m), influxIndex);
m.setName(tempName);
}
}

void Collector::send(Metric&& metric, std::size_t skipBackend)
{
std::size_t index = 0;
for (auto& b: mBackends) {
if (index++ == skipBackend) {
continue;
}
b->send(metric);
}
if (mDerivedHandler->isRegistered(metric.getName())) {
Expand Down
5 changes: 5 additions & 0 deletions src/Metric.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ std::string Metric::getName() const
return mName;
}

void Metric::setName(std::string name)
{
mName = name;
}

Metric::Metric(int value, const std::string& name, std::chrono::time_point<std::chrono::system_clock> timestamp) :
mValue(value), mName(name), mTimestamp(timestamp)
{}
Expand Down

0 comments on commit 3af3d2d

Please sign in to comment.