Skip to content

Commit

Permalink
MARKETINFRA-473 issue #24 Added instant metrics mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Firov committed Dec 25, 2017
1 parent 751be53 commit d3235ca
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import ru.yandex.market.graphouse.Metric;
import ru.yandex.market.graphouse.monitoring.Monitoring;
import ru.yandex.market.graphouse.monitoring.MonitoringUnit;
import ru.yandex.market.graphouse.statistics.IStatisticsService;
import ru.yandex.market.graphouse.statistics.InstantMetric;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -65,9 +67,13 @@ public class MetricCacher implements Runnable, InitializingBean {

private ExecutorService executorService;

public MetricCacher(JdbcTemplate clickHouseJdbcTemplate, Monitoring monitoring) {
public MetricCacher(JdbcTemplate clickHouseJdbcTemplate, Monitoring monitoring,
IStatisticsService statisticsService) {
this.clickHouseJdbcTemplate = clickHouseJdbcTemplate;
this.monitoring = monitoring;

statisticsService.registerInstantMetric(InstantMetric.METRIC_CACHE_QUEUE_SIZE,
() -> (double) this.metricQueue.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
},
ignoreResourceNotFound = true
)
@Import({DbConfig.class, MetricsConfig.class, ServerConfig.class, StatisticsConfig.class})
@Import({DbConfig.class, MetricsConfig.class, ServerConfig.class,
StatisticsConfig.class, StatisticsCountersConfig.class})
public class GraphouseConfig {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.core.JdbcTemplate;
import ru.yandex.market.graphouse.MetricValidator;
import ru.yandex.market.graphouse.cacher.MetricCacher;
Expand All @@ -14,12 +15,14 @@
import ru.yandex.market.graphouse.retention.RetentionProvider;
import ru.yandex.market.graphouse.search.MetricSearch;
import ru.yandex.market.graphouse.server.MetricFactory;
import ru.yandex.market.graphouse.statistics.IStatisticsService;

/**
* @author Vlad Vinogradov <a href="mailto:[email protected]"></a>
* @date 10.11.16
*/
@Configuration
@Import({StatisticsConfig.class})
public class MetricsConfig {

@Autowired
Expand All @@ -28,6 +31,9 @@ public class MetricsConfig {
@Autowired
private JdbcTemplate clickHouseJdbcTemplate;

@Autowired
private IStatisticsService statisticsService;

@Value("${graphouse.clickhouse.data-table}")
private String graphiteDataTable;

Expand Down Expand Up @@ -75,7 +81,7 @@ public MetricValidator metricValidator() {

@Bean
public MetricCacher metricCacher() {
return new MetricCacher(clickHouseJdbcTemplate, monitoring);
return new MetricCacher(clickHouseJdbcTemplate, monitoring, statisticsService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,28 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import ru.yandex.market.graphouse.cacher.MetricCacher;
import ru.yandex.market.graphouse.search.MetricSearch;
import ru.yandex.market.graphouse.statistics.IStatisticsService;
import ru.yandex.market.graphouse.statistics.StatisticsCounter;
import ru.yandex.market.graphouse.statistics.StatisticsFlushFrequencyConfig;
import ru.yandex.market.graphouse.statistics.StatisticsService;

import java.util.List;
import java.util.stream.Collectors;

/**
* @author Nikolay Firov <a href="mailto:[email protected]"></a>
* @date 22.12.17
*/
@Configuration
@Import({MetricsConfig.class})
public class StatisticsConfig {
@Value("${graphouse.statistics.flush.threads}")
private int numberOfFlushThreads;

@Value("${graphouse.statistics.metrics_to_flush_frequency}")
private String metricsFlushFrequency;


@Value("${graphouse.statistics.metric_prefix}")
private String metricsPrefix;

@Bean
public StatisticsFlushFrequencyConfig statisticsFlushFrequencyConfig() {
return new StatisticsFlushFrequencyConfig(metricsFlushFrequency);
}

@Bean
public IStatisticsService statisticsService(StatisticsFlushFrequencyConfig config, MetricSearch metricSearch,
MetricCacher metricCacher) {
List<StatisticsCounter> counters = config.getMetricNameToFlushPeriodInSeconds().entrySet()
.stream()
.map(e -> new StatisticsCounter(
String.format("%s.%s", metricsPrefix, e.getKey()), e.getValue(), metricSearch, metricCacher)
)
.collect(Collectors.toList());

return new StatisticsService(counters, numberOfFlushThreads);
public StatisticsService statisticsService() {
return new StatisticsService(numberOfFlushThreads);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ru.yandex.market.graphouse.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import ru.yandex.market.graphouse.cacher.MetricCacher;
import ru.yandex.market.graphouse.search.MetricSearch;
import ru.yandex.market.graphouse.statistics.StatisticsCounter;
import ru.yandex.market.graphouse.statistics.StatisticsFlushFrequencyConfig;
import ru.yandex.market.graphouse.statistics.StatisticsService;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @author Nikolay Firov <a href="mailto:[email protected]"></a>
* @date 22.12.17
*/
@Configuration
@Import({StatisticsConfig.class})
public class StatisticsCountersConfig {
@Autowired
StatisticsService statisticsService;

@Autowired
StatisticsFlushFrequencyConfig config;

@Autowired
MetricSearch metricSearch;

@Autowired
MetricCacher metricCacher;

@Value("${graphouse.statistics.metric_prefix}")
private String metricsPrefix;

@PostConstruct
public void initialize() {
List<StatisticsCounter> counters = config.getMetricNameToFlushPeriodInSeconds().entrySet()
.stream()
.map(this::createCounter)
.collect(Collectors.toList());

statisticsService.initialize(counters);
}

private StatisticsCounter createCounter(Map.Entry<String, Integer> metricToFlushPeriod) {
return new StatisticsCounter(
String.format("%s.%s", metricsPrefix, metricToFlushPeriod.getKey()), metricToFlushPeriod.getValue(),
metricSearch, metricCacher);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package ru.yandex.market.graphouse.statistics;

import java.util.function.Supplier;

/**
* @author Nikolay Firov <a href="mailto:[email protected]"></a>
* @date 25.12.17
*/
public interface IStatisticsService {
void accumulateMetric(AccumulatedMetric metric, double value);

void registerInstantMetric(InstantMetric metric, Supplier<Double> supplier);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.yandex.market.graphouse.statistics;

/**
* @author Nikolay Firov <a href="mailto:[email protected]"></a>
* @date 22.12.17
*/
public enum InstantMetric {
METRIC_CACHE_QUEUE_SIZE
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* @author Nikolay Firov <a href="mailto:[email protected]"></a>
Expand All @@ -28,7 +30,8 @@ public class StatisticsCounter {
private final MetricCacher metricCacher;
private final MetricSearch metricSearch;

private final Map<AccumulatedMetric, MetricDescription> metricsDescriptions = new HashMap<>();
private final Map<AccumulatedMetric, MetricDescription> accumulatedMetricsDescriptions = new HashMap<>();
private final Map<InstantMetric, MetricDescription> instantMetricsDescriptions = new HashMap<>();
private final Map<AccumulatedMetric, AtomicDouble> metricsCounters = new HashMap<>();

public StatisticsCounter(String prefix, Integer flushPeriodSeconds,
Expand All @@ -44,36 +47,69 @@ public StatisticsCounter(String prefix, Integer flushPeriodSeconds,
public void initialize() {
for (AccumulatedMetric metric : AccumulatedMetric.values()) {
String name = String.format("%s.accumulated.%s", prefix, metric.name().toLowerCase());
MetricDescription description = this.metricSearch.add(name);
loadMetric(name, description -> accumulatedMetricsDescriptions.put(metric, description));
}

if (description != null) {
metricsDescriptions.put(metric, description);
log.info("Statistics metric loaded " + name);
} else {
log.warn("Failed to load metric " + name);
}
for (InstantMetric metric : InstantMetric.values()) {
String name = String.format("%s.instant.%s", prefix, metric.name().toLowerCase());
loadMetric(name, description -> instantMetricsDescriptions.put(metric, description));
}
}

public void accumulateMetric(AccumulatedMetric metric, double value) {
this.metricsCounters.get(metric).addAndGet(value);
}

public void flush() {
List<Metric> metrics = new ArrayList<>(metricsCounters.size());
public void flush(Map<InstantMetric, Supplier<Double>> instantMetricsSuppliers) {
int timestampSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());

List<Metric> metrics = getAccumulatedMetrics(timestampSeconds);
metrics.addAll(getInstantMetrics(instantMetricsSuppliers, timestampSeconds));

this.metricCacher.submitMetrics(metrics);
}

private void loadMetric(String name, Consumer<MetricDescription> save) {
MetricDescription description = this.metricSearch.add(name);

if (description != null) {
save.accept(description);
log.info("Statistics metric loaded " + name);
} else {
log.warn("Failed to load metric " + name);
}
}

private List<Metric> getInstantMetrics(Map<InstantMetric, Supplier<Double>> instantMetricsSuppliers,
int timestampSeconds) {
List<Metric> metrics = new ArrayList<>(instantMetricsSuppliers.size());

for (Map.Entry<AccumulatedMetric, AtomicDouble> counter : metricsCounters.entrySet()) {
for (Map.Entry<InstantMetric, Supplier<Double>> entry : instantMetricsSuppliers.entrySet()) {
MetricDescription description = this.instantMetricsDescriptions.get(entry.getKey());
if (description == null) {
continue;
}

metrics.add(new Metric(description, timestampSeconds, entry.getValue().get(), timestampSeconds));
}

return metrics;
}

private List<Metric> getAccumulatedMetrics(int timestampSeconds) {
List<Metric> metrics = new ArrayList<>(this.metricsCounters.size());

for (Map.Entry<AccumulatedMetric, AtomicDouble> counter : this.metricsCounters.entrySet()) {
Double value = counter.getValue().getAndSet(0);
MetricDescription description = metricsDescriptions.get(counter.getKey());
MetricDescription description = this.accumulatedMetricsDescriptions.get(counter.getKey());
if (description == null) {
continue;
}

int timestampSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
metrics.add(new Metric(description, timestampSeconds, value, timestampSeconds));
}

this.metricCacher.submitMetrics(metrics);
return metrics;
}

public int getFlushPeriodSeconds() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,44 @@
package ru.yandex.market.graphouse.statistics;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* @author Nikolay Firov <a href="mailto:[email protected]"></a>
* @date 22.12.17
*/
public class StatisticsService implements IStatisticsService {
private final List<StatisticsCounter> counters;
private List<StatisticsCounter> counters;
private final ScheduledExecutorService scheduler;
private final Map<InstantMetric, Supplier<Double>> instantMetricsSuppliers = new ConcurrentHashMap<>();

public StatisticsService(List<StatisticsCounter> counters, int numberOfThreads) {
this.counters = counters;
public StatisticsService(int numberOfThreads) {
this.scheduler = Executors.newScheduledThreadPool(numberOfThreads);
}

@PostConstruct
public void initialize() {
public void initialize(List<StatisticsCounter> counters) {
this.counters = counters;
this.counters.forEach(StatisticsCounter::initialize);
this.counters.forEach(agent -> this.scheduler.scheduleAtFixedRate(
agent::flush, agent.getFlushPeriodSeconds(), agent.getFlushPeriodSeconds(), TimeUnit.SECONDS)
this.counters.forEach(counter ->
this.scheduler.scheduleAtFixedRate(
() -> counter.flush(instantMetricsSuppliers), counter.getFlushPeriodSeconds(),
counter.getFlushPeriodSeconds(), TimeUnit.SECONDS
)
);
}

@Override
public void accumulateMetric(AccumulatedMetric metric, double value) {
this.counters.forEach(x -> x.accumulateMetric(metric, value));
}

@Override
public void registerInstantMetric(InstantMetric metric, Supplier<Double> supplier) {
instantMetricsSuppliers.put(metric, supplier);
}
}

0 comments on commit d3235ca

Please sign in to comment.