Skip to content

Commit

Permalink
Duplicate metrics should not lead to failure (#24)
Browse files Browse the repository at this point in the history
* Duplicate metrics should not lead to failure

There have been many bugs that have crept in over the years that lead to
some sort of broker failure because sensors were not cleaned up properly
during some normal state transition.

KAFKA-8066 is a good example of this. Even after the fix in 6ca899e for
that bug, we are still seeing the same problem in production.

Rather than trying to find all possible code paths that can lead to
Selector#close not being invoked, this patch just makes duplicate
metrics log an error without throwing.

While duplicate metrics from multiple sensors can be the sign of another
bug, making it fatal seems like a mistake. A common failure we see is
that a replica fetcher thread does not shut down cleanly, leaving behind
some metrics as the thread dies (taking its NetworkClient with
it). Later, when the broker tries to start a replacement replica fetcher
thread in response to a LeaderAndIsr request from the controller, the
fetcher thread cannot start due to this sensor conflict.

Failing to start a replica fetcher in this case can lead to a data loss
situation, since we could end up with the cluster in a state where we
cannot safely restart any brokers to clear up this situation without
offline partitions.

Rather than continuing this trend, we should make sensor duplication
non-fatal. This behavior is configurable and is disabled by default.
  • Loading branch information
ambroff authored May 3, 2019
1 parent e464234 commit b4cc6f0
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public class CommonClientConfigs {
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
public static final String METRICS_RECORDING_LEVEL_DOC = "The highest recording level for metrics.";

public static final String METRICS_REPLACE_ON_DUPLICATE_CONFIG = "metrics.replace.on.duplicate";
public static final String METRICS_REPLACE_ON_DUPLICATE_DOC = "If set to true, then multiple sensors registering the same metric will not throw, but will instead log an error. This makes sensor registration errors non fatal.";


public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,13 @@ public class AdminClientConfig extends AbstractConfig {
private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC;

public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
public static final String METRICS_REPLACE_ON_DUPLICATE_CONFIG = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG;

public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
private static final String METRICS_REPLACE_ON_DUPLICATE_DOC = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC;

public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;

Expand Down Expand Up @@ -158,6 +160,11 @@ public class AdminClientConfig extends AbstractConfig {
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
Importance.LOW,
METRICS_RECORDING_LEVEL_DOC)
.define(METRICS_REPLACE_ON_DUPLICATE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
METRICS_REPLACE_ON_DUPLICATE_DOC)
// security support
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
.tags(metricTags);
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, time);
metrics.setReplaceOnDuplicateMetric(config.getBoolean(AdminClientConfig.METRICS_REPLACE_ON_DUPLICATE_CONFIG));
String metricGrpPrefix = "admin-client";
channelBuilder = ClientUtils.createChannelBuilder(config);
selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ public class ConsumerConfig extends AbstractConfig {
*/
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;

/**
* <code>metrics.replace.on.duplicate</code>
*/
public static final String METRIC_REPLACE_ON_DUPLICATE_CONFIG = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG;

/**
* <code>check.crcs</code>
*/
Expand Down Expand Up @@ -404,6 +409,11 @@ public class ConsumerConfig extends AbstractConfig {
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(METRIC_REPLACE_ON_DUPLICATE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ private KafkaConsumer(ConsumerConfig config,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.metrics.setReplaceOnDuplicateMetric(config.getBoolean(ConsumerConfig.METRIC_REPLACE_ON_DUPLICATE_CONFIG));
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

// load interceptors and make sure they get clientId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.metrics.setReplaceOnDuplicateMetric(config.getBoolean(ProducerConfig.METRICS_REPLACE_ON_DUPLICATE_CONFIG));
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ public class ProducerConfig extends AbstractConfig {
*/
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;

/**
* <code>metrics.replace.on.duplicate</code>
*/
public static final String METRICS_REPLACE_ON_DUPLICATE_CONFIG = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG;

/** <code>metric.reporters</code> */
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;

Expand Down Expand Up @@ -290,6 +295,11 @@ public class ProducerConfig extends AbstractConfig {
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(METRICS_REPLACE_ON_DUPLICATE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC)
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
Type.INT,
5,
Expand Down
20 changes: 17 additions & 3 deletions clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class Metrics implements Closeable {
private final ScheduledThreadPoolExecutor metricsScheduler;
private static final Logger log = LoggerFactory.getLogger(Metrics.class);

private volatile boolean replaceOnDuplicate = false;

/**
* Create a metrics repository with no metric reporters and default configuration.
* Expiration of Sensors is disabled.
Expand Down Expand Up @@ -528,7 +530,7 @@ public synchronized KafkaMetric removeMetric(MetricName metricName) {
try {
reporter.metricRemoval(metric);
} catch (Exception e) {
log.error("Error when removing metric from " + reporter.getClass().getName(), e);
log.error("Error when removing metric " + metricName + " from " + reporter.getClass().getName(), e);
}
}
log.trace("Removed metric named {}", metricName);
Expand All @@ -555,8 +557,16 @@ public synchronized void removeReporter(MetricsReporter reporter) {

synchronized void registerMetric(KafkaMetric metric) {
MetricName metricName = metric.metricName();
if (this.metrics.containsKey(metricName))
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");

if (this.metrics.containsKey(metricName)) {
if (replaceOnDuplicate) {
log.error("The metric " + metricName + " is being replaced since it had already been registered. Please file a bug report.");
removeMetric(metricName);
} else {
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
}
}

this.metrics.put(metricName, metric);
for (MetricsReporter reporter : reporters) {
try {
Expand Down Expand Up @@ -631,6 +641,10 @@ public MetricName metricInstance(MetricNameTemplate template, Map<String, String
return this.metricName(template.name(), template.group(), template.description(), tags);
}

public void setReplaceOnDuplicateMetric(boolean value) {
replaceOnDuplicate = value;
}

/**
* Close this metrics repository.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,24 @@ public void testDuplicateMetricName() {
metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new Total());
}

@Test
public void testDuplicateMetricNameOptionallyReplace() {
metrics.setReplaceOnDuplicateMetric(true);

int initialSize = metrics.metrics().size();
MetricName metricName = metrics.metricName("test1", "grp1");
metrics.addMetric(metricName, new Count());
assertEquals(initialSize + 1, metrics.metrics().size());

metrics.addMetric(metricName, new Count());
assertEquals(initialSize + 1, metrics.metrics().size());

assertNotNull(metrics.removeMetric(metricName));
assertNull(metrics.metrics().get(metricName));

assertEquals(initialSize, metrics.metrics().size());
}

@Test
public void testQuotas() {
Sensor sensor = metrics.sensor("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public void testIdempotentAdd() {
// pass
}

// Unless duplicate sensors are allowed, then the sensor will be replaced with an error in the logs
metrics.setReplaceOnDuplicateMetric(true);
final Sensor anotherAnotherSensor = metrics.sensor("another-sensor");
anotherAnotherSensor.add(metrics.metricName("test-metric", "test-group"), new Avg());

// note that adding a different metric with the same name is also a no-op
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Sum()));

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ object Defaults {
val MetricSampleWindowMs = 30000
val MetricReporterClasses = ""
val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
val MetricReplaceOnDuplicate = false

/** ********* SSL configuration ***********/
val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
Expand Down Expand Up @@ -430,6 +431,7 @@ object KafkaConfig {
val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
val MetricReplaceOnDuplicateProp: String = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG

/** ******** Common Security Configuration *************/
val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
Expand Down Expand Up @@ -747,6 +749,7 @@ object KafkaConfig {
val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
val MetricReplaceOnDuplicateDoc = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC

/** ******** Common Security Configuration *************/
val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
Expand Down Expand Up @@ -988,6 +991,7 @@ object KafkaConfig {
.define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc)
.define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
.define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc)
.define(MetricReplaceOnDuplicateProp, BOOLEAN, Defaults.MetricReplaceOnDuplicate, LOW, MetricReplaceOnDuplicateDoc)

/** ********* Quota configuration ***********/
.define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
Expand Down Expand Up @@ -1268,6 +1272,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
val metricReplaceOnDuplicate = getBoolean(KafkaConfig.MetricReplaceOnDuplicateProp)

/** ********* SSL/SASL Configuration **************/
// Security configs may be overridden for listeners, so it is not safe to use the base values
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
metrics.setReplaceOnDuplicateMetric(config.metricReplaceOnDuplicate)

/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
Expand Down

0 comments on commit b4cc6f0

Please sign in to comment.