Skip to content

Commit

Permalink
Make the allowlist reconfigurable
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison committed Nov 5, 2024
1 parent 1425a3c commit 5c239a5
Show file tree
Hide file tree
Showing 23 changed files with 404 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import io.strimzi.kafka.metrics.collector.MetricWrapper;
import io.strimzi.kafka.metrics.collector.PrometheusCollector;
import io.strimzi.kafka.metrics.collector.kafka.KafkaCollector;
import io.strimzi.kafka.metrics.collector.kafka.KafkaMetricWrapper;
import io.strimzi.kafka.metrics.http.HttpServers;
import io.strimzi.kafka.metrics.kafka.KafkaCollector;
import io.strimzi.kafka.metrics.kafka.KafkaMetricWrapper;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -33,8 +34,7 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {

private final PrometheusRegistry registry;
private final KafkaCollector kafkaCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
private final PrometheusCollector prometheusCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private Optional<HttpServers.ServerCounter> httpServer;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method
Expand All @@ -45,18 +45,21 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {
*/
public KafkaPrometheusMetricsReporter() {
registry = PrometheusRegistry.defaultRegistry;
kafkaCollector = KafkaCollector.getCollector(PrometheusCollector.register(registry));
prometheusCollector = PrometheusCollector.register(registry);
kafkaCollector = KafkaCollector.getCollector(prometheusCollector);
}

// for testing
KafkaPrometheusMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector) {
KafkaPrometheusMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector, PrometheusCollector prometheusCollector) {
this.registry = registry;
this.kafkaCollector = kafkaCollector;
this.prometheusCollector = prometheusCollector;
}

@Override
public void configure(Map<String, ?> map) {
config = new PrometheusMetricsReporterConfig(map, registry);
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(map, registry);
prometheusCollector.updateAllowlist(config.allowlist());
httpServer = config.startHttpServer();
LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config);
}
Expand All @@ -70,12 +73,8 @@ public void init(List<KafkaMetric> metrics) {

public void metricChange(KafkaMetric metric) {
String prometheusName = KafkaMetricWrapper.prometheusName(prefix, metric.metricName());
if (!config.isAllowed(prometheusName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
kafkaCollector.addMetric(metric.metricName(), metricWrapper);
}
MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
kafkaCollector.addMetric(metric.metricName(), metricWrapper);
}

@Override
Expand All @@ -90,15 +89,19 @@ public void close() {

@Override
public void reconfigure(Map<String, ?> configs) {
PrometheusMetricsReporterConfig newConfig = new PrometheusMetricsReporterConfig(configs, null);
prometheusCollector.updateAllowlist(newConfig.allowlist());
LOG.debug("KafkaPrometheusMetricsReporter reconfigured with {}", newConfig);
}

@Override
public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
new PrometheusMetricsReporterConfig(configs, null);
}

@Override
public Set<String> reconfigurableConfigs() {
return Collections.emptySet();
return PrometheusMetricsReporterConfig.RECONFIGURABLES;
}

@Override
Expand Down
22 changes: 0 additions & 22 deletions src/main/java/io/strimzi/kafka/metrics/MetricsCollector.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -67,6 +69,11 @@ public class PrometheusMetricsReporterConfig extends AbstractConfig {
.define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, ALLOWLIST_CONFIG_DOC)
.define(LISTENER_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, LISTENER_ENABLE_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, LISTENER_ENABLE_CONFIG_DOC);

/**
* The list of configurations that are reconfigurable at runtime.
*/
public static final Set<String> RECONFIGURABLES = Collections.singleton(ALLOWLIST_CONFIG);

private final Listener listener;
private final boolean listenerEnabled;
private final Pattern allowlist;
Expand All @@ -87,16 +94,11 @@ public PrometheusMetricsReporterConfig(Map<?, ?> props, PrometheusRegistry regis
}

/**
* Check if a metric is allowed.
*
* @param name the name of the metric.
* @return true if the metric is allowed, false otherwise.
* Compile the allowlist into a {@link Pattern}.
* @param allowlist the list of regex patterns
* @return the Pattern for the allowlist.
*/
public boolean isAllowed(String name) {
return allowlist.matcher(name).matches();
}

private Pattern compileAllowlist(List<String> allowlist) {
public static Pattern compileAllowlist(List<String> allowlist) {
for (String entry : allowlist) {
try {
Pattern.compile(entry);
Expand All @@ -109,8 +111,15 @@ private Pattern compileAllowlist(List<String> allowlist) {
}

/**
* Gets the listener.
*
* Get the allowlist.
* @return the allowlist.
*/
public Pattern allowlist() {
return allowlist;
}

/**
* Get the listener.
* @return the listener.
*/
public String listener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import com.yammer.metrics.core.MetricsRegistryListener;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.strimzi.kafka.metrics.yammer.YammerCollector;
import io.strimzi.kafka.metrics.yammer.YammerMetricWrapper;
import io.strimzi.kafka.metrics.collector.MetricWrapper;
import io.strimzi.kafka.metrics.collector.PrometheusCollector;
import io.strimzi.kafka.metrics.collector.yammer.YammerCollector;
import io.strimzi.kafka.metrics.collector.yammer.YammerMetricWrapper;
import kafka.metrics.KafkaMetricsReporter;
import kafka.utils.VerifiableProperties;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
Expand All @@ -30,6 +32,7 @@ public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter, Me

private final PrometheusRegistry registry;
private final YammerCollector yammerCollector;
private final PrometheusCollector prometheusCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method
/* test */ PrometheusMetricsReporterConfig config;

Expand All @@ -38,13 +41,15 @@ public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter, Me
*/
public YammerPrometheusMetricsReporter() {
registry = PrometheusRegistry.defaultRegistry;
yammerCollector = YammerCollector.getCollector(PrometheusCollector.register(registry));
prometheusCollector = PrometheusCollector.register(registry);
yammerCollector = YammerCollector.getCollector(prometheusCollector);
}

// for testing
YammerPrometheusMetricsReporter(PrometheusRegistry registry, PrometheusCollector prometheusCollector) {
this.registry = registry;
yammerCollector = YammerCollector.getCollector(prometheusCollector);
this.prometheusCollector = prometheusCollector;
}

@Override
Expand All @@ -53,18 +58,15 @@ public void init(VerifiableProperties props) {
for (MetricsRegistry yammerRegistry : Arrays.asList(KafkaYammerMetrics.defaultRegistry(), Metrics.defaultRegistry())) {
yammerRegistry.addListener(this);
}
yammerCollector.updateAllowlist(config.allowlist());
LOG.debug("YammerPrometheusMetricsReporter configured with {}", config);
}

@Override
public void onMetricAdded(MetricName name, Metric metric) {
String prometheusName = YammerMetricWrapper.prometheusName(name);
if (!config.isAllowed(prometheusName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new YammerMetricWrapper(prometheusName, name.getScope(), metric, name.getName());
yammerCollector.addMetric(name, metricWrapper);
}
MetricWrapper metricWrapper = new YammerMetricWrapper(prometheusName, name.getScope(), metric, name.getName());
yammerCollector.addMetric(name, metricWrapper);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;
package io.strimzi.kafka.metrics.collector;

import io.prometheus.metrics.model.snapshots.CounterSnapshot;
import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;
package io.strimzi.kafka.metrics.collector;

import io.prometheus.metrics.model.snapshots.Labels;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics.collector;

import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

/**
* Abstract class for both Kafka and Yammer collectors
*/
public abstract class MetricsCollector {

private static final Logger LOG = LoggerFactory.getLogger(MetricsCollector.class);

private final Map<Object, MetricWrapper> allowedMetrics = new ConcurrentHashMap<>();
private final Map<Object, MetricWrapper> otherMetrics = new ConcurrentHashMap<>();
/* test */ Pattern allowlist = Pattern.compile(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG_DEFAULT);

/**
* Collect all the metrics added to this Collector
*
* @return the list of metrics of this collector
*/
public abstract List<MetricSnapshot<?>> collect();

/**
* Update the allowlist used by the Collector to filter metrics
* @param allowlist the new allowlist Pattern
*/
public void updateAllowlist(Pattern allowlist) {
this.allowlist = allowlist;
update();
}

/**
* Add a metric to be collected.
* @param name The name of the metric to add.
* @param metric The metric to add.
*/
public void addMetric(Object name, MetricWrapper metric) {
if (allowlist.matcher(metric.prometheusName()).matches()) {
allowedMetrics.put(name, metric);
} else {
LOG.trace("Ignoring metric {} as it does not match the allowlist", metric.prometheusName());
otherMetrics.put(name, metric);
}
}

/**
* Remove a metric from collection.
* @param name The name of metric to remove.
*/
public void removeMetric(Object name) {
allowedMetrics.remove(name);
otherMetrics.remove(name);
}

/**
* Retrieve the allowed metrics
* @return the collection of allowed MetricWrapper
*/
public Collection<MetricWrapper> allowedMetrics() {
return allowedMetrics.values();
}

private void update() {
Map<Object, MetricWrapper> newAllowedMetrics = new HashMap<>();
for (Map.Entry<Object, MetricWrapper> entry : otherMetrics.entrySet()) {
String name = entry.getValue().prometheusName();
if (allowlist.matcher(name).matches()) {
newAllowedMetrics.put(entry.getKey(), entry.getValue());
otherMetrics.remove(entry.getKey());
}
}
for (Map.Entry<Object, MetricWrapper> entry : allowedMetrics.entrySet()) {
String name = entry.getValue().prometheusName();
if (!allowlist.matcher(name).matches()) {
otherMetrics.put(entry.getKey(), entry.getValue());
allowedMetrics.remove(entry.getKey());
}
}
allowedMetrics.putAll(newAllowedMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;
package io.strimzi.kafka.metrics.collector;

import io.prometheus.metrics.instrumentation.jvm.JvmMetrics;
import io.prometheus.metrics.model.registry.MultiCollector;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.MetricSnapshots;
import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter;
import io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

/**
* Prometheus Collector to store and export metrics retrieved by {@link KafkaPrometheusMetricsReporter}
Expand All @@ -26,8 +29,6 @@ public class PrometheusCollector implements MultiCollector {
// At runtime this should contain at most one instance of KafkaCollector and one instance of YammerCollector
private final List<MetricsCollector> collectors = new ArrayList<>();

/* for testing */ PrometheusCollector() { }

/**
* Register this Collector with the provided Prometheus registry
* @param registry The Prometheus registry
Expand Down Expand Up @@ -62,4 +63,14 @@ public MetricSnapshots collect() {
}
return new MetricSnapshots(snapshots);
}

/**
* Update the allowlist for all the registered collectors
* @param allowlist The new allowlist
*/
public void updateAllowlist(Pattern allowlist) {
for (MetricsCollector collector : collectors) {
collector.updateAllowlist(allowlist);
}
}
}
Loading

0 comments on commit 5c239a5

Please sign in to comment.